diff --git a/Cargo.lock b/Cargo.lock index d5752bfb61..b132c6d3fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,7 @@ dependencies = [ "common-error", "common-time", "datatypes", + "greptime-proto", "prost 0.11.6", "snafu", "tonic", @@ -2968,6 +2969,16 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "greptime-proto" +version = "0.1.0" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=966161508646f575801bcf05f47ed283ec231d68#966161508646f575801bcf05f47ed283ec231d68" +dependencies = [ + "prost 0.11.6", + "tonic", + "tonic-build", +] + [[package]] name = "h2" version = "0.3.15" diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 7678f30d5a..7892210163 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -10,6 +10,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "966161508646f575801bcf05f47ed283ec231d68" } prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tonic.workspace = true diff --git a/src/api/build.rs b/src/api/build.rs deleted file mode 100644 index 1844fae618..0000000000 --- a/src/api/build.rs +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -fn main() { - tonic_build::configure() - .compile( - &[ - "greptime/v1/database.proto", - "greptime/v1/meta/common.proto", - "greptime/v1/meta/heartbeat.proto", - "greptime/v1/meta/route.proto", - "greptime/v1/meta/store.proto", - "greptime/v1/meta/cluster.proto", - "prometheus/remote/remote.proto", - ], - &["."], - ) - .expect("compile proto"); -} diff --git a/src/api/greptime/v1/column.proto b/src/api/greptime/v1/column.proto deleted file mode 100644 index c1bda12142..0000000000 --- a/src/api/greptime/v1/column.proto +++ /dev/null @@ -1,85 +0,0 @@ -syntax = "proto3"; - -package greptime.v1; - -message Column { - string column_name = 1; - - enum SemanticType { - TAG = 0; - FIELD = 1; - TIMESTAMP = 2; - } - SemanticType semantic_type = 2; - - message Values { - repeated int32 i8_values = 1; - repeated int32 i16_values = 2; - repeated int32 i32_values = 3; - repeated int64 i64_values = 4; - - repeated uint32 u8_values = 5; - repeated uint32 u16_values = 6; - repeated uint32 u32_values = 7; - repeated uint64 u64_values = 8; - - repeated float f32_values = 9; - repeated double f64_values = 10; - - repeated bool bool_values = 11; - repeated bytes binary_values = 12; - repeated string string_values = 13; - - repeated int32 date_values = 14; - repeated int64 datetime_values = 15; - repeated int64 ts_second_values = 16; - repeated int64 ts_millisecond_values = 17; - repeated int64 ts_microsecond_values = 18; - repeated int64 ts_nanosecond_values = 19; - } - // The array of non-null values in this column. - // - // For example: suppose there is a column "foo" that contains some int32 values (1, 2, 3, 4, 5, null, 7, 8, 9, null); - // column: - // column_name: foo - // semantic_type: Tag - // values: 1, 2, 3, 4, 5, 7, 8, 9 - // null_masks: 00100000 00000010 - Values values = 3; - - // Mask maps the positions of null values. - // If a bit in null_mask is 1, it indicates that the column value at that position is null. - bytes null_mask = 4; - - // Helpful in creating vector from column. - ColumnDataType datatype = 5; -} - -message ColumnDef { - string name = 1; - ColumnDataType datatype = 2; - bool is_nullable = 3; - bytes default_constraint = 4; -} - -enum ColumnDataType { - BOOLEAN = 0; - INT8 = 1; - INT16 = 2; - INT32 = 3; - INT64 = 4; - UINT8 = 5; - UINT16 = 6; - UINT32 = 7; - UINT64 = 8; - FLOAT32 = 9; - FLOAT64 = 10; - BINARY = 11; - STRING = 12; - DATE = 13; - DATETIME = 14; - TIMESTAMP_SECOND = 15; - TIMESTAMP_MILLISECOND = 16; - TIMESTAMP_MICROSECOND = 17; - TIMESTAMP_NANOSECOND = 18; -} diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto deleted file mode 100644 index 354cd9861e..0000000000 --- a/src/api/greptime/v1/database.proto +++ /dev/null @@ -1,52 +0,0 @@ -syntax = "proto3"; - -package greptime.v1; - -import "greptime/v1/ddl.proto"; -import "greptime/v1/column.proto"; - -message RequestHeader { - // The `catalog` that is selected to be used in this request. - string catalog = 1; - // The `schema` that is selected to be used in this request. - string schema = 2; -} - -message GreptimeRequest { - RequestHeader header = 1; - oneof request { - InsertRequest insert = 2; - QueryRequest query = 3; - DdlRequest ddl = 4; - } -} - -message QueryRequest { - oneof query { - string sql = 1; - bytes logical_plan = 2; - } -} - -message InsertRequest { - string table_name = 1; - - // Data is represented here. - repeated Column columns = 3; - - // The row_count of all columns, which include null and non-null values. - // - // Note: the row_count of all columns in a InsertRequest must be same. - uint32 row_count = 4; - - // The region number of current insert request. - uint32 region_number = 5; -} - -message AffectedRows { - uint32 value = 1; -} - -message FlightMetadata { - AffectedRows affected_rows = 1; -} diff --git a/src/api/greptime/v1/ddl.proto b/src/api/greptime/v1/ddl.proto deleted file mode 100644 index cb7678e981..0000000000 --- a/src/api/greptime/v1/ddl.proto +++ /dev/null @@ -1,79 +0,0 @@ -syntax = "proto3"; - -package greptime.v1; - -import "greptime/v1/column.proto"; - -// "Data Definition Language" requests, that create, modify or delete the database structures but not the data. -// `DdlRequest` could carry more information than plain SQL, for example, the "table_id" in `CreateTableExpr`. -// So create a new DDL expr if you need it. -message DdlRequest { - oneof expr { - CreateDatabaseExpr create_database = 1; - CreateTableExpr create_table = 2; - AlterExpr alter = 3; - DropTableExpr drop_table = 4; - } -} - -message CreateTableExpr { - string catalog_name = 1; - string schema_name = 2; - string table_name = 3; - string desc = 4; - repeated ColumnDef column_defs = 5; - string time_index = 6; - repeated string primary_keys = 7; - bool create_if_not_exists = 8; - map table_options = 9; - TableId table_id = 10; - repeated uint32 region_ids = 11; -} - -message AlterExpr { - string catalog_name = 1; - string schema_name = 2; - string table_name = 3; - oneof kind { - AddColumns add_columns = 4; - DropColumns drop_columns = 5; - RenameTable rename_table = 6; - } -} - -message DropTableExpr { - string catalog_name = 1; - string schema_name = 2; - string table_name = 3; -} - -message CreateDatabaseExpr { - //TODO(hl): maybe rename to schema_name? - string database_name = 1; - bool create_if_not_exists = 2; -} - -message AddColumns { - repeated AddColumn add_columns = 1; -} - -message DropColumns { - repeated DropColumn drop_columns = 1; -} - -message RenameTable { - string new_table_name = 1; -} - -message AddColumn { - ColumnDef column_def = 1; - bool is_key = 2; -} - -message DropColumn { - string name = 1; -} - -message TableId { - uint32 id = 1; -} diff --git a/src/api/greptime/v1/meta/cluster.proto b/src/api/greptime/v1/meta/cluster.proto deleted file mode 100644 index 7bacb0b1ab..0000000000 --- a/src/api/greptime/v1/meta/cluster.proto +++ /dev/null @@ -1,27 +0,0 @@ -syntax = "proto3"; - -package greptime.v1.meta; - -import "greptime/v1/meta/common.proto"; -import "greptime/v1/meta/store.proto"; - -// Cluster service is used for communication between meta nodes. -service Cluster { - // Batch get kvs by input keys from leader's in_memory kv store. - rpc BatchGet(BatchGetRequest) returns (BatchGetResponse); - - // Range get the kvs from leader's in_memory kv store. - rpc Range(RangeRequest) returns (RangeResponse); -} - -message BatchGetRequest { - RequestHeader header = 1; - - repeated bytes keys = 2; -} - -message BatchGetResponse { - ResponseHeader header = 1; - - repeated KeyValue kvs = 2; -} diff --git a/src/api/greptime/v1/meta/common.proto b/src/api/greptime/v1/meta/common.proto deleted file mode 100644 index af0fcd47e7..0000000000 --- a/src/api/greptime/v1/meta/common.proto +++ /dev/null @@ -1,48 +0,0 @@ -syntax = "proto3"; - -package greptime.v1.meta; - -message RequestHeader { - uint64 protocol_version = 1; - // cluster_id is the ID of the cluster which be sent to. - uint64 cluster_id = 2; - // member_id is the ID of the sender server. - uint64 member_id = 3; -} - -message ResponseHeader { - uint64 protocol_version = 1; - // cluster_id is the ID of the cluster which sent the response. - uint64 cluster_id = 2; - Error error = 3; -} - -message Error { - int32 code = 1; - string err_msg = 2; -} - -message Peer { - uint64 id = 1; - string addr = 2; -} - -message TableName { - string catalog_name = 1; - string schema_name = 2; - string table_name = 3; -} - -message TimeInterval { - // The unix timestamp in millis of the start of this period. - uint64 start_timestamp_millis = 1; - // The unix timestamp in millis of the end of this period. - uint64 end_timestamp_millis = 2; -} - -message KeyValue { - // key is the key in bytes. An empty key is not allowed. - bytes key = 1; - // value is the value held by the key, in bytes. - bytes value = 2; -} diff --git a/src/api/greptime/v1/meta/heartbeat.proto b/src/api/greptime/v1/meta/heartbeat.proto deleted file mode 100644 index 91a8bcae55..0000000000 --- a/src/api/greptime/v1/meta/heartbeat.proto +++ /dev/null @@ -1,92 +0,0 @@ -syntax = "proto3"; - -package greptime.v1.meta; - -import "greptime/v1/meta/common.proto"; - -service Heartbeat { - // Heartbeat, there may be many contents of the heartbeat, such as: - // 1. Metadata to be registered to meta server and discoverable by other nodes. - // 2. Some performance metrics, such as Load, CPU usage, etc. - // 3. The number of computing tasks being executed. - rpc Heartbeat(stream HeartbeatRequest) returns (stream HeartbeatResponse) {} - - // Ask leader's endpoint. - rpc AskLeader(AskLeaderRequest) returns (AskLeaderResponse) {} -} - -message HeartbeatRequest { - RequestHeader header = 1; - - // Self peer - Peer peer = 2; - // Leader node - bool is_leader = 3; - // Actually reported time interval - TimeInterval report_interval = 4; - // Node stat - NodeStat node_stat = 5; - // Region stats on this node - repeated RegionStat region_stats = 6; - // Follower nodes and stats, empty on follower nodes - repeated ReplicaStat replica_stats = 7; -} - -message NodeStat { - // The read capacity units during this period - int64 rcus = 1; - // The write capacity units during this period - int64 wcus = 2; - // How many tables on this node - int64 table_num = 3; - // How many regions on this node - int64 region_num = 4; - - double cpu_usage = 5; - double load = 6; - // Read disk IO on this node - double read_io_rate = 7; - // Write disk IO on this node - double write_io_rate = 8; - - // Others - map attrs = 100; -} - -message RegionStat { - uint64 region_id = 1; - TableName table_name = 2; - // The read capacity units during this period - int64 rcus = 3; - // The write capacity units during this period - int64 wcus = 4; - // Approximate bytes of this region - int64 approximate_bytes = 5; - // Approximate number of rows in this region - int64 approximate_rows = 6; - - // Others - map attrs = 100; -} - -message ReplicaStat { - Peer peer = 1; - bool in_sync = 2; - bool is_learner = 3; -} - -message HeartbeatResponse { - ResponseHeader header = 1; - - repeated bytes payload = 2; -} - -message AskLeaderRequest { - RequestHeader header = 1; -} - -message AskLeaderResponse { - ResponseHeader header = 1; - - Peer leader = 2; -} diff --git a/src/api/greptime/v1/meta/route.proto b/src/api/greptime/v1/meta/route.proto deleted file mode 100644 index 1b1cc67507..0000000000 --- a/src/api/greptime/v1/meta/route.proto +++ /dev/null @@ -1,99 +0,0 @@ -syntax = "proto3"; - -package greptime.v1.meta; - -import "greptime/v1/meta/common.proto"; - -service Router { - rpc Create(CreateRequest) returns (RouteResponse) {} - - // Fetch routing information for tables. The smallest unit is the complete - // routing information(all regions) of a table. - // - // ```text - // table_1 - // table_name - // table_schema - // regions - // region_1 - // leader_peer - // follower_peer_1, follower_peer_2 - // region_2 - // leader_peer - // follower_peer_1, follower_peer_2, follower_peer_3 - // region_xxx - // table_2 - // ... - // ``` - // - rpc Route(RouteRequest) returns (RouteResponse) {} - - rpc Delete(DeleteRequest) returns (RouteResponse) {} -} - -message CreateRequest { - RequestHeader header = 1; - - TableName table_name = 2; - repeated Partition partitions = 3; - bytes table_info = 4; -} - -message RouteRequest { - RequestHeader header = 1; - - repeated TableName table_names = 2; -} - -message DeleteRequest { - RequestHeader header = 1; - - TableName table_name = 2; -} - -message RouteResponse { - ResponseHeader header = 1; - - repeated Peer peers = 2; - repeated TableRoute table_routes = 3; -} - -message TableRoute { - Table table = 1; - repeated RegionRoute region_routes = 2; -} - -message RegionRoute { - Region region = 1; - // single leader node for write task - uint64 leader_peer_index = 2; - // multiple follower nodes for read task - repeated uint64 follower_peer_indexes = 3; -} - -message Table { - uint64 id = 1; - TableName table_name = 2; - bytes table_schema = 3; -} - -message Region { - // TODO(LFC): Maybe use message RegionNumber? - uint64 id = 1; - string name = 2; - Partition partition = 3; - - map attrs = 100; -} - -// PARTITION `region_name` VALUES LESS THAN (value_list) -message Partition { - repeated bytes column_list = 1; - repeated bytes value_list = 2; -} - -// This message is only for saving into store. -message TableRouteValue { - repeated Peer peers = 1; - TableRoute table_route = 2; -} diff --git a/src/api/greptime/v1/meta/store.proto b/src/api/greptime/v1/meta/store.proto deleted file mode 100644 index cd951f454e..0000000000 --- a/src/api/greptime/v1/meta/store.proto +++ /dev/null @@ -1,159 +0,0 @@ -syntax = "proto3"; - -package greptime.v1.meta; - -import "greptime/v1/meta/common.proto"; - -service Store { - // Range gets the keys in the range from the key-value store. - rpc Range(RangeRequest) returns (RangeResponse); - - // Put puts the given key into the key-value store. - rpc Put(PutRequest) returns (PutResponse); - - // BatchPut atomically puts the given keys into the key-value store. - rpc BatchPut(BatchPutRequest) returns (BatchPutResponse); - - // CompareAndPut atomically puts the value to the given updated - // value if the current value == the expected value. - rpc CompareAndPut(CompareAndPutRequest) returns (CompareAndPutResponse); - - // DeleteRange deletes the given range from the key-value store. - rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse); - - // MoveValue atomically renames the key to the given updated key. - rpc MoveValue(MoveValueRequest) returns (MoveValueResponse); -} - -message RangeRequest { - RequestHeader header = 1; - - // key is the first key for the range, If range_end is not given, the - // request only looks up key. - bytes key = 2; - // range_end is the upper bound on the requested range [key, range_end). - // If range_end is '\0', the range is all keys >= key. - // If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"), - // then the range request gets all keys prefixed with key. - // If both key and range_end are '\0', then the range request returns all - // keys. - bytes range_end = 3; - // limit is a limit on the number of keys returned for the request. When - // limit is set to 0, it is treated as no limit. - int64 limit = 4; - // keys_only when set returns only the keys and not the values. - bool keys_only = 5; -} - -message RangeResponse { - ResponseHeader header = 1; - - // kvs is the list of key-value pairs matched by the range request. - repeated KeyValue kvs = 2; - // more indicates if there are more keys to return in the requested range. - bool more = 3; -} - -message PutRequest { - RequestHeader header = 1; - - // key is the key, in bytes, to put into the key-value store. - bytes key = 2; - // value is the value, in bytes, to associate with the key in the - // key-value store. - bytes value = 3; - // If prev_kv is set, gets the previous key-value pair before changing it. - // The previous key-value pair will be returned in the put response. - bool prev_kv = 4; -} - -message PutResponse { - ResponseHeader header = 1; - - // If prev_kv is set in the request, the previous key-value pair will be - // returned. - KeyValue prev_kv = 2; -} - -message BatchPutRequest { - RequestHeader header = 1; - - repeated KeyValue kvs = 2; - // If prev_kv is set, gets the previous key-value pairs before changing it. - // The previous key-value pairs will be returned in the batch put response. - bool prev_kv = 3; -} - -message BatchPutResponse { - ResponseHeader header = 1; - - // If prev_kv is set in the request, the previous key-value pairs will be - // returned. - repeated KeyValue prev_kvs = 2; -} - -message CompareAndPutRequest { - RequestHeader header = 1; - - // key is the key, in bytes, to put into the key-value store. - bytes key = 2; - // expect is the previous value, in bytes - bytes expect = 3; - // value is the value, in bytes, to associate with the key in the - // key-value store. - bytes value = 4; -} - -message CompareAndPutResponse { - ResponseHeader header = 1; - - bool success = 2; - KeyValue prev_kv = 3; -} - -message DeleteRangeRequest { - RequestHeader header = 1; - - // key is the first key to delete in the range. - bytes key = 2; - // range_end is the key following the last key to delete for the range - // [key, range_end). - // If range_end is not given, the range is defined to contain only the key - // argument. - // If range_end is one bit larger than the given key, then the range is all - // the keys with the prefix (the given key). - // If range_end is '\0', the range is all keys greater than or equal to the - // key argument. - bytes range_end = 3; - // If prev_kv is set, gets the previous key-value pairs before deleting it. - // The previous key-value pairs will be returned in the delete response. - bool prev_kv = 4; -} - -message DeleteRangeResponse { - ResponseHeader header = 1; - - // deleted is the number of keys deleted by the delete range request. - int64 deleted = 2; - // If prev_kv is set in the request, the previous key-value pairs will be - // returned. - repeated KeyValue prev_kvs = 3; -} - -message MoveValueRequest { - RequestHeader header = 1; - - // If from_key dose not exist, return the value of to_key (if it exists). - // If from_key exists, move the value of from_key to to_key (i.e. rename), - // and return the value. - bytes from_key = 2; - bytes to_key = 3; -} - -message MoveValueResponse { - ResponseHeader header = 1; - - // If from_key dose not exist, return the value of to_key (if it exists). - // If from_key exists, return the value of from_key. - KeyValue kv = 2; -} diff --git a/src/api/prometheus/remote/remote.proto b/src/api/prometheus/remote/remote.proto deleted file mode 100644 index 623f363055..0000000000 --- a/src/api/prometheus/remote/remote.proto +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2016 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; -package prometheus; - -option go_package = "prompb"; - -import "prometheus/remote/types.proto"; - -message WriteRequest { - repeated prometheus.TimeSeries timeseries = 1; - // Cortex uses this field to determine the source of the write request. - // We reserve it to avoid any compatibility issues. - reserved 2; - repeated prometheus.MetricMetadata metadata = 3; -} - -// ReadRequest represents a remote read request. -message ReadRequest { - repeated Query queries = 1; - - enum ResponseType { - // Server will return a single ReadResponse message with matched series that includes list of raw samples. - // It's recommended to use streamed response types instead. - // - // Response headers: - // Content-Type: "application/x-protobuf" - // Content-Encoding: "snappy" - SAMPLES = 0; - // Server will stream a delimited ChunkedReadResponse message that contains XOR encoded chunks for a single series. - // Each message is following varint size and fixed size bigendian uint32 for CRC32 Castagnoli checksum. - // - // Response headers: - // Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" - // Content-Encoding: "" - STREAMED_XOR_CHUNKS = 1; - } - - // accepted_response_types allows negotiating the content type of the response. - // - // Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is - // implemented by server, error is returned. - // For request that do not contain `accepted_response_types` field the SAMPLES response type will be used. - repeated ResponseType accepted_response_types = 2; -} - -// ReadResponse is a response when response_type equals SAMPLES. -message ReadResponse { - // In same order as the request's queries. - repeated QueryResult results = 1; -} - -message Query { - int64 start_timestamp_ms = 1; - int64 end_timestamp_ms = 2; - repeated prometheus.LabelMatcher matchers = 3; - prometheus.ReadHints hints = 4; -} - -message QueryResult { - // Samples within a time series must be ordered by time. - repeated prometheus.TimeSeries timeseries = 1; -} - -// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS. -// We strictly stream full series after series, optionally split by time. This means that a single frame can contain -// partition of the single series, but once a new series is started to be streamed it means that no more chunks will -// be sent for previous one. Series are returned sorted in the same way TSDB block are internally. -message ChunkedReadResponse { - repeated prometheus.ChunkedSeries chunked_series = 1; - - // query_index represents an index of the query from ReadRequest.queries these chunks relates to. - int64 query_index = 2; -} diff --git a/src/api/prometheus/remote/types.proto b/src/api/prometheus/remote/types.proto deleted file mode 100644 index 0d17e88d29..0000000000 --- a/src/api/prometheus/remote/types.proto +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2017 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; -package prometheus; - -option go_package = "prompb"; - -message MetricMetadata { - enum MetricType { - UNKNOWN = 0; - COUNTER = 1; - GAUGE = 2; - HISTOGRAM = 3; - GAUGEHISTOGRAM = 4; - SUMMARY = 5; - INFO = 6; - STATESET = 7; - } - - // Represents the metric type, these match the set from Prometheus. - // Refer to model/textparse/interface.go for details. - MetricType type = 1; - string metric_family_name = 2; - string help = 4; - string unit = 5; -} - -message Sample { - double value = 1; - // timestamp is in ms format, see model/timestamp/timestamp.go for - // conversion from time.Time to Prometheus timestamp. - int64 timestamp = 2; -} - -message Exemplar { - // Optional, can be empty. - repeated Label labels = 1; - double value = 2; - // timestamp is in ms format, see model/timestamp/timestamp.go for - // conversion from time.Time to Prometheus timestamp. - int64 timestamp = 3; -} - -// TimeSeries represents samples and labels for a single time series. -message TimeSeries { - // For a timeseries to be valid, and for the samples and exemplars - // to be ingested by the remote system properly, the labels field is required. - repeated Label labels = 1; - repeated Sample samples = 2; - repeated Exemplar exemplars = 3; -} - -message Label { - string name = 1; - string value = 2; -} - -message Labels { - repeated Label labels = 1; -} - -// Matcher specifies a rule, which can match or set of labels or not. -message LabelMatcher { - enum Type { - EQ = 0; - NEQ = 1; - RE = 2; - NRE = 3; - } - Type type = 1; - string name = 2; - string value = 3; -} - -message ReadHints { - int64 step_ms = 1; // Query step size in milliseconds. - string func = 2; // String representation of surrounding function or aggregation. - int64 start_ms = 3; // Start time in milliseconds. - int64 end_ms = 4; // End time in milliseconds. - repeated string grouping = 5; // List of label names used in aggregation. - bool by = 6; // Indicate whether it is without or by. - int64 range_ms = 7; // Range vector selector range in milliseconds. -} - -// Chunk represents a TSDB chunk. -// Time range [min, max] is inclusive. -message Chunk { - int64 min_time_ms = 1; - int64 max_time_ms = 2; - - // We require this to match chunkenc.Encoding. - enum Encoding { - UNKNOWN = 0; - XOR = 1; - } - Encoding type = 3; - bytes data = 4; -} - -// ChunkedSeries represents single, encoded time series. -message ChunkedSeries { - // Labels should be sorted. - repeated Label labels = 1; - // Chunks will be in start time order and may overlap. - repeated Chunk chunks = 2; -} diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 8bd50bcad9..48debebb5a 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -105,125 +105,121 @@ impl TryFrom for ColumnDataTypeWrapper { } } -impl Values { - pub fn with_capacity(datatype: ColumnDataType, capacity: usize) -> Self { - match datatype { - ColumnDataType::Boolean => Values { - bool_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Int8 => Values { - i8_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Int16 => Values { - i16_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Int32 => Values { - i32_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Int64 => Values { - i64_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Uint8 => Values { - u8_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Uint16 => Values { - u16_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Uint32 => Values { - u32_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Uint64 => Values { - u64_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Float32 => Values { - f32_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Float64 => Values { - f64_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Binary => Values { - binary_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::String => Values { - string_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Date => Values { - date_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::Datetime => Values { - datetime_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::TimestampSecond => Values { - ts_second_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::TimestampMillisecond => Values { - ts_millisecond_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::TimestampMicrosecond => Values { - ts_microsecond_values: Vec::with_capacity(capacity), - ..Default::default() - }, - ColumnDataType::TimestampNanosecond => Values { - ts_nanosecond_values: Vec::with_capacity(capacity), - ..Default::default() - }, - } +pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values { + match datatype { + ColumnDataType::Boolean => Values { + bool_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Int8 => Values { + i8_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Int16 => Values { + i16_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Int32 => Values { + i32_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Int64 => Values { + i64_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Uint8 => Values { + u8_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Uint16 => Values { + u16_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Uint32 => Values { + u32_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Uint64 => Values { + u64_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Float32 => Values { + f32_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Float64 => Values { + f64_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Binary => Values { + binary_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::String => Values { + string_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Date => Values { + date_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Datetime => Values { + datetime_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::TimestampSecond => Values { + ts_second_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::TimestampMillisecond => Values { + ts_millisecond_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::TimestampMicrosecond => Values { + ts_microsecond_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::TimestampNanosecond => Values { + ts_nanosecond_values: Vec::with_capacity(capacity), + ..Default::default() + }, } } -impl Column { - // The type of vals must be same. - pub fn push_vals(&mut self, origin_count: usize, vector: VectorRef) { - let values = self.values.get_or_insert_with(Values::default); - let mut null_mask = BitVec::from_slice(&self.null_mask); - let len = vector.len(); - null_mask.reserve_exact(origin_count + len); - null_mask.extend(BitVec::repeat(false, len)); +// The type of vals must be same. +pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { + let values = column.values.get_or_insert_with(Values::default); + let mut null_mask = BitVec::from_slice(&column.null_mask); + let len = vector.len(); + null_mask.reserve_exact(origin_count + len); + null_mask.extend(BitVec::repeat(false, len)); - (0..len).into_iter().for_each(|idx| match vector.get(idx) { - Value::Null => null_mask.set(idx + origin_count, true), - Value::Boolean(val) => values.bool_values.push(val), - Value::UInt8(val) => values.u8_values.push(val.into()), - Value::UInt16(val) => values.u16_values.push(val.into()), - Value::UInt32(val) => values.u32_values.push(val), - Value::UInt64(val) => values.u64_values.push(val), - Value::Int8(val) => values.i8_values.push(val.into()), - Value::Int16(val) => values.i16_values.push(val.into()), - Value::Int32(val) => values.i32_values.push(val), - Value::Int64(val) => values.i64_values.push(val), - Value::Float32(val) => values.f32_values.push(*val), - Value::Float64(val) => values.f64_values.push(*val), - Value::String(val) => values.string_values.push(val.as_utf8().to_string()), - Value::Binary(val) => values.binary_values.push(val.to_vec()), - Value::Date(val) => values.date_values.push(val.val()), - Value::DateTime(val) => values.datetime_values.push(val.val()), - Value::Timestamp(val) => match val.unit() { - TimeUnit::Second => values.ts_second_values.push(val.value()), - TimeUnit::Millisecond => values.ts_millisecond_values.push(val.value()), - TimeUnit::Microsecond => values.ts_microsecond_values.push(val.value()), - TimeUnit::Nanosecond => values.ts_nanosecond_values.push(val.value()), - }, - Value::List(_) => unreachable!(), - }); - self.null_mask = null_mask.into_vec(); - } + (0..len).into_iter().for_each(|idx| match vector.get(idx) { + Value::Null => null_mask.set(idx + origin_count, true), + Value::Boolean(val) => values.bool_values.push(val), + Value::UInt8(val) => values.u8_values.push(val.into()), + Value::UInt16(val) => values.u16_values.push(val.into()), + Value::UInt32(val) => values.u32_values.push(val), + Value::UInt64(val) => values.u64_values.push(val), + Value::Int8(val) => values.i8_values.push(val.into()), + Value::Int16(val) => values.i16_values.push(val.into()), + Value::Int32(val) => values.i32_values.push(val), + Value::Int64(val) => values.i64_values.push(val), + Value::Float32(val) => values.f32_values.push(*val), + Value::Float64(val) => values.f64_values.push(*val), + Value::String(val) => values.string_values.push(val.as_utf8().to_string()), + Value::Binary(val) => values.binary_values.push(val.to_vec()), + Value::Date(val) => values.date_values.push(val.val()), + Value::DateTime(val) => values.datetime_values.push(val.val()), + Value::Timestamp(val) => match val.unit() { + TimeUnit::Second => values.ts_second_values.push(val.value()), + TimeUnit::Millisecond => values.ts_millisecond_values.push(val.value()), + TimeUnit::Microsecond => values.ts_microsecond_values.push(val.value()), + TimeUnit::Nanosecond => values.ts_nanosecond_values.push(val.value()), + }, + Value::List(_) => unreachable!(), + }); + column.null_mask = null_mask.into_vec(); } #[cfg(test)] @@ -239,59 +235,59 @@ mod tests { #[test] fn test_values_with_capacity() { - let values = Values::with_capacity(ColumnDataType::Int8, 2); + let values = values_with_capacity(ColumnDataType::Int8, 2); let values = values.i8_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::Int32, 2); + let values = values_with_capacity(ColumnDataType::Int32, 2); let values = values.i32_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::Int64, 2); + let values = values_with_capacity(ColumnDataType::Int64, 2); let values = values.i64_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::Uint8, 2); + let values = values_with_capacity(ColumnDataType::Uint8, 2); let values = values.u8_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::Uint32, 2); + let values = values_with_capacity(ColumnDataType::Uint32, 2); let values = values.u32_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::Uint64, 2); + let values = values_with_capacity(ColumnDataType::Uint64, 2); let values = values.u64_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::Float32, 2); + let values = values_with_capacity(ColumnDataType::Float32, 2); let values = values.f32_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::Float64, 2); + let values = values_with_capacity(ColumnDataType::Float64, 2); let values = values.f64_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::Binary, 2); + let values = values_with_capacity(ColumnDataType::Binary, 2); let values = values.binary_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::Boolean, 2); + let values = values_with_capacity(ColumnDataType::Boolean, 2); let values = values.bool_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::String, 2); + let values = values_with_capacity(ColumnDataType::String, 2); let values = values.string_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::Date, 2); + let values = values_with_capacity(ColumnDataType::Date, 2); let values = values.date_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::Datetime, 2); + let values = values_with_capacity(ColumnDataType::Datetime, 2); let values = values.datetime_values; assert_eq!(2, values.capacity()); - let values = Values::with_capacity(ColumnDataType::TimestampMillisecond, 2); + let values = values_with_capacity(ColumnDataType::TimestampMillisecond, 2); let values = values.ts_millisecond_values; assert_eq!(2, values.capacity()); } @@ -462,28 +458,28 @@ mod tests { }; let vector = Arc::new(TimestampNanosecondVector::from_vec(vec![1, 2, 3])); - column.push_vals(3, vector); + push_vals(&mut column, 3, vector); assert_eq!( vec![1, 2, 3], column.values.as_ref().unwrap().ts_nanosecond_values ); let vector = Arc::new(TimestampMillisecondVector::from_vec(vec![4, 5, 6])); - column.push_vals(3, vector); + push_vals(&mut column, 3, vector); assert_eq!( vec![4, 5, 6], column.values.as_ref().unwrap().ts_millisecond_values ); let vector = Arc::new(TimestampMicrosecondVector::from_vec(vec![7, 8, 9])); - column.push_vals(3, vector); + push_vals(&mut column, 3, vector); assert_eq!( vec![7, 8, 9], column.values.as_ref().unwrap().ts_microsecond_values ); let vector = Arc::new(TimestampSecondVector::from_vec(vec![10, 11, 12])); - column.push_vals(3, vector); + push_vals(&mut column, 3, vector); assert_eq!( vec![10, 11, 12], column.values.as_ref().unwrap().ts_second_values @@ -507,7 +503,7 @@ mod tests { let row_count = 4; let vector = Arc::new(BooleanVector::from(vec![Some(true), None, Some(false)])); - column.push_vals(row_count, vector); + push_vals(&mut column, row_count, vector); // Some(false), None, Some(true), Some(true), Some(true), None, Some(false) let bool_values = column.values.unwrap().bool_values; assert_eq!(vec![false, true, true, true, false], bool_values); diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs index 77f58b383c..f642aea3f6 100644 --- a/src/api/src/lib.rs +++ b/src/api/src/lib.rs @@ -14,8 +14,13 @@ pub mod error; pub mod helper; -pub mod prometheus; -pub mod serde; + +pub mod prometheus { + pub mod remote { + pub use greptime_proto::prometheus::remote::*; + } +} + pub mod v1; pub use prost::DecodeError; diff --git a/src/api/src/prometheus.rs b/src/api/src/prometheus.rs deleted file mode 100644 index 01f6019945..0000000000 --- a/src/api/src/prometheus.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![allow(clippy::derive_partial_eq_without_eq)] - -pub mod remote { - tonic::include_proto!("prometheus"); -} diff --git a/src/api/src/serde.rs b/src/api/src/serde.rs deleted file mode 100644 index ce7967cf98..0000000000 --- a/src/api/src/serde.rs +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub use prost::DecodeError; -use prost::Message; - -use crate::v1::meta::TableRouteValue; - -macro_rules! impl_convert_with_bytes { - ($data_type: ty) => { - impl From<$data_type> for Vec { - fn from(entity: $data_type) -> Self { - entity.encode_to_vec() - } - } - - impl TryFrom<&[u8]> for $data_type { - type Error = DecodeError; - - fn try_from(value: &[u8]) -> Result { - <$data_type>::decode(value.as_ref()) - } - } - }; -} - -impl_convert_with_bytes!(TableRouteValue); diff --git a/src/api/src/v1.rs b/src/api/src/v1.rs index 078733fd1e..fff5f3d145 100644 --- a/src/api/src/v1.rs +++ b/src/api/src/v1.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![allow(clippy::derive_partial_eq_without_eq)] -tonic::include_proto!("greptime.v1"); +pub mod column_def; -mod column_def; -pub mod meta; +pub mod meta { + pub use greptime_proto::v1::meta::*; +} + +pub use greptime_proto::v1::*; diff --git a/src/api/src/v1/column_def.rs b/src/api/src/v1/column_def.rs index 1c7a4fae8b..aa15974c12 100644 --- a/src/api/src/v1/column_def.rs +++ b/src/api/src/v1/column_def.rs @@ -19,21 +19,24 @@ use crate::error::{self, Result}; use crate::helper::ColumnDataTypeWrapper; use crate::v1::ColumnDef; -impl ColumnDef { - pub fn try_as_column_schema(&self) -> Result { - let data_type = ColumnDataTypeWrapper::try_new(self.datatype)?; +pub fn try_as_column_schema(column_def: &ColumnDef) -> Result { + let data_type = ColumnDataTypeWrapper::try_new(column_def.datatype)?; - let constraint = if self.default_constraint.is_empty() { - None - } else { - Some( - ColumnDefaultConstraint::try_from(self.default_constraint.as_slice()) - .context(error::ConvertColumnDefaultConstraintSnafu { column: &self.name })?, - ) - }; + let constraint = if column_def.default_constraint.is_empty() { + None + } else { + Some( + ColumnDefaultConstraint::try_from(column_def.default_constraint.as_slice()).context( + error::ConvertColumnDefaultConstraintSnafu { + column: &column_def.name, + }, + )?, + ) + }; - ColumnSchema::new(&self.name, data_type.into(), self.is_nullable) - .with_default_constraint(constraint) - .context(error::InvalidColumnDefaultConstraintSnafu { column: &self.name }) - } + ColumnSchema::new(&column_def.name, data_type.into(), column_def.is_nullable) + .with_default_constraint(constraint) + .context(error::InvalidColumnDefaultConstraintSnafu { + column: &column_def.name, + }) } diff --git a/src/api/src/v1/meta.rs b/src/api/src/v1/meta.rs deleted file mode 100644 index dafe6f69c3..0000000000 --- a/src/api/src/v1/meta.rs +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -tonic::include_proto!("greptime.v1.meta"); - -use std::collections::HashMap; -use std::hash::{Hash, Hasher}; - -pub const PROTOCOL_VERSION: u64 = 1; - -#[derive(Default)] -pub struct PeerDict { - peers: HashMap, - index: usize, -} - -impl PeerDict { - pub fn get_or_insert(&mut self, peer: Peer) -> usize { - let index = self.peers.entry(peer).or_insert_with(|| { - let v = self.index; - self.index += 1; - v - }); - - *index - } - - pub fn into_peers(self) -> Vec { - let mut array = vec![Peer::default(); self.index]; - for (p, i) in self.peers { - array[i] = p; - } - array - } -} - -#[allow(clippy::derive_hash_xor_eq)] -impl Hash for Peer { - fn hash(&self, state: &mut H) { - self.id.hash(state); - self.addr.hash(state); - } -} - -impl Eq for Peer {} - -impl RequestHeader { - #[inline] - pub fn new((cluster_id, member_id): (u64, u64)) -> Self { - Self { - protocol_version: PROTOCOL_VERSION, - cluster_id, - member_id, - } - } -} - -impl ResponseHeader { - #[inline] - pub fn success(cluster_id: u64) -> Self { - Self { - protocol_version: PROTOCOL_VERSION, - cluster_id, - ..Default::default() - } - } - - #[inline] - pub fn failed(cluster_id: u64, error: Error) -> Self { - Self { - protocol_version: PROTOCOL_VERSION, - cluster_id, - error: Some(error), - } - } - - #[inline] - pub fn is_not_leader(&self) -> bool { - if let Some(error) = &self.error { - if error.code == ErrorCode::NotLeader as i32 { - return true; - } - } - false - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ErrorCode { - NoActiveDatanodes = 1, - NotLeader = 2, -} - -impl Error { - #[inline] - pub fn no_active_datanodes() -> Self { - Self { - code: ErrorCode::NoActiveDatanodes as i32, - err_msg: "No active datanodes".to_string(), - } - } - - #[inline] - pub fn is_not_leader() -> Self { - Self { - code: ErrorCode::NotLeader as i32, - err_msg: "Current server is not leader".to_string(), - } - } -} - -impl HeartbeatResponse { - #[inline] - pub fn is_not_leader(&self) -> bool { - if let Some(header) = &self.header { - return header.is_not_leader(); - } - false - } -} - -macro_rules! gen_set_header { - ($req: ty) => { - impl $req { - #[inline] - pub fn set_header(&mut self, (cluster_id, member_id): (u64, u64)) { - self.header = Some(RequestHeader::new((cluster_id, member_id))); - } - } - }; -} - -gen_set_header!(HeartbeatRequest); -gen_set_header!(RouteRequest); -gen_set_header!(CreateRequest); -gen_set_header!(RangeRequest); -gen_set_header!(DeleteRequest); -gen_set_header!(PutRequest); -gen_set_header!(BatchPutRequest); -gen_set_header!(CompareAndPutRequest); -gen_set_header!(DeleteRangeRequest); -gen_set_header!(MoveValueRequest); - -#[cfg(test)] -mod tests { - use std::vec; - - use super::*; - - #[test] - fn test_peer_dict() { - let mut dict = PeerDict::default(); - - dict.get_or_insert(Peer { - id: 1, - addr: "111".to_string(), - }); - dict.get_or_insert(Peer { - id: 2, - addr: "222".to_string(), - }); - dict.get_or_insert(Peer { - id: 1, - addr: "111".to_string(), - }); - dict.get_or_insert(Peer { - id: 1, - addr: "111".to_string(), - }); - dict.get_or_insert(Peer { - id: 1, - addr: "111".to_string(), - }); - dict.get_or_insert(Peer { - id: 1, - addr: "111".to_string(), - }); - dict.get_or_insert(Peer { - id: 2, - addr: "222".to_string(), - }); - - assert_eq!(2, dict.index); - assert_eq!( - vec![ - Peer { - id: 1, - addr: "111".to_string(), - }, - Peer { - id: 2, - addr: "222".to_string(), - } - ], - dict.into_peers() - ); - } -} diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index 16fc2ca86c..5c450253d5 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::v1::alter_expr::Kind; -use api::v1::{AlterExpr, CreateTableExpr, DropColumns, RenameTable}; +use api::v1::{column_def, AlterExpr, CreateTableExpr, DropColumns, RenameTable}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use snafu::{ensure, OptionExt, ResultExt}; @@ -42,12 +42,11 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result { field: "column_def", })?; - let schema = - column_def - .try_as_column_schema() - .context(InvalidColumnDefSnafu { - column: &column_def.name, - })?; + let schema = column_def::try_as_column_schema(&column_def).context( + InvalidColumnDefSnafu { + column: &column_def.name, + }, + )?; Ok(AddColumnRequest { column_schema: schema, is_key: ac.is_key, @@ -98,8 +97,7 @@ pub fn create_table_schema(expr: &CreateTableExpr) -> Result { .column_defs .iter() .map(|x| { - x.try_as_column_schema() - .context(InvalidColumnDefSnafu { column: &x.name }) + column_def::try_as_column_schema(x).context(InvalidColumnDefSnafu { column: &x.name }) }) .collect::>>()?; diff --git a/src/common/grpc/src/writer.rs b/src/common/grpc/src/writer.rs index c38ab72af7..c111d61b66 100644 --- a/src/common/grpc/src/writer.rs +++ b/src/common/grpc/src/writer.rs @@ -14,7 +14,8 @@ use std::collections::HashMap; -use api::v1::column::{SemanticType, Values}; +use api::helper::values_with_capacity; +use api::v1::column::SemanticType; use api::v1::{Column, ColumnDataType}; use common_base::BitVec; use snafu::ensure; @@ -212,7 +213,7 @@ impl LinesWriter { batch.0.push(Column { column_name: column_name.to_string(), semantic_type: semantic_type.into(), - values: Some(Values::with_capacity(datatype, to_insert)), + values: Some(values_with_capacity(datatype, to_insert)), datatype: datatype as i32, null_mask: Vec::default(), }); diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index b3a4289636..053b673052 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -88,7 +88,7 @@ impl Instance { mod tests { use std::sync::Arc; - use api::v1::{ColumnDataType, ColumnDef, TableId}; + use api::v1::{column_def, ColumnDataType, ColumnDef, TableId}; use common_catalog::consts::MIN_USER_TABLE_ID; use common_grpc_expr::create_table_schema; use datatypes::prelude::ConcreteDataType; @@ -146,7 +146,7 @@ mod tests { is_nullable: true, default_constraint: vec![], }; - let result = column_def.try_as_column_schema(); + let result = column_def::try_as_column_schema(&column_def); assert!(matches!( result.unwrap_err(), api::error::Error::UnknownColumnDataType { .. } @@ -158,7 +158,7 @@ mod tests { is_nullable: true, default_constraint: vec![], }; - let column_schema = column_def.try_as_column_schema().unwrap(); + let column_schema = column_def::try_as_column_schema(&column_def).unwrap(); assert_eq!(column_schema.name, "a"); assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); assert!(column_schema.is_nullable()); @@ -170,7 +170,7 @@ mod tests { is_nullable: true, default_constraint: default_constraint.clone().try_into().unwrap(), }; - let column_schema = column_def.try_as_column_schema().unwrap(); + let column_schema = column_def::try_as_column_schema(&column_def).unwrap(); assert_eq!(column_schema.name, "a"); assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); assert!(column_schema.is_nullable()); diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index bf8e54f49c..fedfc8358e 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -19,7 +19,8 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::{ - AlterExpr, CreateDatabaseExpr, CreateTableExpr, DropTableExpr, InsertRequest, TableId, + column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DropTableExpr, InsertRequest, + TableId, }; use async_trait::async_trait; use catalog::helper::{SchemaKey, SchemaValue}; @@ -511,9 +512,8 @@ fn create_table_info(create_table: &CreateTableExpr) -> Result { let mut column_name_to_index_map = HashMap::new(); for (idx, column) in create_table.column_defs.iter().enumerate() { - let schema = column - .try_as_column_schema() - .context(error::InvalidColumnDefSnafu { + let schema = + column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu { column: &column.name, })?; let schema = schema.with_time_index(column.name == create_table.time_index); diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index 9640540957..8919838cef 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use api::helper::ColumnDataTypeWrapper; +use api::helper::{push_vals, ColumnDataTypeWrapper}; use api::v1::column::SemanticType; use api::v1::{Column, InsertRequest as GrpcInsertRequest}; use client::Database; @@ -120,7 +120,7 @@ pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<(Vec>>()?;