mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 15:22:56 +00:00
Merge remote-tracking branch 'origin/main' into zhongzc/repartition-procedure-scaffold
This commit is contained in:
2
.github/workflows/develop.yml
vendored
2
.github/workflows/develop.yml
vendored
@@ -632,7 +632,7 @@ jobs:
|
||||
- name: Unzip binaries
|
||||
run: tar -xvf ./bins.tar.gz
|
||||
- name: Run sqlness
|
||||
run: RUST_BACKTRACE=1 ./bins/sqlness-runner ${{ matrix.mode.opts }} -c ./tests/cases --bins-dir ./bins --preserve-state
|
||||
run: RUST_BACKTRACE=1 ./bins/sqlness-runner bare ${{ matrix.mode.opts }} -c ./tests/cases --bins-dir ./bins --preserve-state
|
||||
- name: Upload sqlness logs
|
||||
if: failure()
|
||||
uses: actions/upload-artifact@v4
|
||||
|
||||
8
Cargo.lock
generated
8
Cargo.lock
generated
@@ -5325,7 +5325,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3e821d0d405e6733690a4e4352812ba2ff780a3e#3e821d0d405e6733690a4e4352812ba2ff780a3e"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d75496d5d09dedcd0edcade57ccf0a522f4393ae#d75496d5d09dedcd0edcade57ccf0a522f4393ae"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.5",
|
||||
@@ -5825,9 +5825,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.14"
|
||||
version = "0.1.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dc2fdfdbff08affe55bb779f33b053aa1fe5dd5b54c257343c17edfa55711bdb"
|
||||
checksum = "3c6995591a8f1380fcb4ba966a252a4b29188d51d2b89e3a252f5305be65aea8"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
@@ -5841,7 +5841,7 @@ dependencies = [
|
||||
"libc",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"socket2 0.5.10",
|
||||
"socket2 0.6.0",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
|
||||
@@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3e821d0d405e6733690a4e4352812ba2ff780a3e" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d75496d5d09dedcd0edcade57ccf0a522f4393ae" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
2
Makefile
2
Makefile
@@ -169,7 +169,7 @@ nextest: ## Install nextest tools.
|
||||
|
||||
.PHONY: sqlness-test
|
||||
sqlness-test: ## Run sqlness test.
|
||||
cargo sqlness ${SQLNESS_OPTS}
|
||||
cargo sqlness bare ${SQLNESS_OPTS}
|
||||
|
||||
RUNS ?= 1
|
||||
FUZZ_TARGET ?= fuzz_alter_table
|
||||
|
||||
18
flake.lock
generated
18
flake.lock
generated
@@ -8,11 +8,11 @@
|
||||
"rust-analyzer-src": "rust-analyzer-src"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1745735608,
|
||||
"narHash": "sha256-L0jzm815XBFfF2wCFmR+M1CF+beIEFj6SxlqVKF59Ec=",
|
||||
"lastModified": 1760078406,
|
||||
"narHash": "sha256-JeJK0ZA845PtkCHkfo4KjeI1mYrsr2s3cxBYKhF4BoE=",
|
||||
"owner": "nix-community",
|
||||
"repo": "fenix",
|
||||
"rev": "c39a78eba6ed2a022cc3218db90d485077101496",
|
||||
"rev": "351277c60d104944122ee389cdf581c5ce2c6732",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -41,11 +41,11 @@
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1748162331,
|
||||
"narHash": "sha256-rqc2RKYTxP3tbjA+PB3VMRQNnjesrT0pEofXQTrMsS8=",
|
||||
"lastModified": 1759994382,
|
||||
"narHash": "sha256-wSK+3UkalDZRVHGCRikZ//CyZUJWDJkBDTQX1+G77Ow=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "7c43f080a7f28b2774f3b3f43234ca11661bf334",
|
||||
"rev": "5da4a26309e796daa7ffca72df93dbe53b8164c7",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@@ -65,11 +65,11 @@
|
||||
"rust-analyzer-src": {
|
||||
"flake": false,
|
||||
"locked": {
|
||||
"lastModified": 1745694049,
|
||||
"narHash": "sha256-fxvRYH/tS7hGQeg9zCVh5RBcSWT+JGJet7RA8Ss+rC0=",
|
||||
"lastModified": 1760014945,
|
||||
"narHash": "sha256-ySdl7F9+oeWNHVrg3QL/brazqmJvYFEdpGnF3pyoDH8=",
|
||||
"owner": "rust-lang",
|
||||
"repo": "rust-analyzer",
|
||||
"rev": "d8887c0758bbd2d5f752d5bd405d4491e90e7ed6",
|
||||
"rev": "90d2e1ce4dfe7dc49250a8b88a0f08ffdb9cb23f",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
lib = nixpkgs.lib;
|
||||
rustToolchain = fenix.packages.${system}.fromToolchainName {
|
||||
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
|
||||
sha256 = "sha256-tJJr8oqX3YD+ohhPK7jlt/7kvKBnBqJVjYtoFr520d4=";
|
||||
sha256 = "sha256-GCGEXGZeJySLND0KU5TdtTrqFV76TF3UdvAHSUegSsk=";
|
||||
};
|
||||
in
|
||||
{
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
[toolchain]
|
||||
channel = "nightly-2025-05-19"
|
||||
channel = "nightly-2025-10-01"
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::BitVec;
|
||||
use common_decimal::Decimal128;
|
||||
use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION};
|
||||
use common_time::time::Time;
|
||||
@@ -24,9 +23,12 @@ use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth
|
||||
use datatypes::prelude::{ConcreteDataType, ValueRef};
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::types::{
|
||||
Int8Type, Int16Type, IntervalType, TimeType, TimestampType, UInt8Type, UInt16Type,
|
||||
Int8Type, Int16Type, IntervalType, StructField, StructType, TimeType, TimestampType, UInt8Type,
|
||||
UInt16Type,
|
||||
};
|
||||
use datatypes::value::{
|
||||
ListValue, ListValueRef, OrderedF32, OrderedF64, StructValue, StructValueRef, Value,
|
||||
};
|
||||
use datatypes::value::{OrderedF32, OrderedF64, Value};
|
||||
use datatypes::vectors::{
|
||||
BinaryVector, BooleanVector, DateVector, Decimal128Vector, Float32Vector, Float64Vector,
|
||||
Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector,
|
||||
@@ -42,14 +44,14 @@ use greptime_proto::v1::query_request::Query;
|
||||
use greptime_proto::v1::value::ValueData;
|
||||
use greptime_proto::v1::{
|
||||
self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonTypeExtension,
|
||||
QueryRequest, Row, SemanticType, VectorTypeExtension,
|
||||
ListTypeExtension, QueryRequest, Row, SemanticType, StructTypeExtension, VectorTypeExtension,
|
||||
};
|
||||
use paste::paste;
|
||||
use snafu::prelude::*;
|
||||
|
||||
use crate::error::{self, InconsistentTimeUnitSnafu, InvalidTimeUnitSnafu, Result};
|
||||
use crate::v1::column::Values;
|
||||
use crate::v1::{Column, ColumnDataType, Value as GrpcValue};
|
||||
use crate::v1::{ColumnDataType, Value as GrpcValue};
|
||||
|
||||
/// ColumnDataTypeWrapper is a wrapper of ColumnDataType and ColumnDataTypeExtension.
|
||||
/// It could be used to convert with ConcreteDataType.
|
||||
@@ -85,7 +87,7 @@ impl ColumnDataTypeWrapper {
|
||||
|
||||
/// Get a tuple of ColumnDataType and ColumnDataTypeExtension.
|
||||
pub fn to_parts(&self) -> (ColumnDataType, Option<ColumnDataTypeExtension>) {
|
||||
(self.datatype, self.datatype_ext)
|
||||
(self.datatype, self.datatype_ext.clone())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,6 +161,45 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
|
||||
ConcreteDataType::vector_default_datatype()
|
||||
}
|
||||
}
|
||||
ColumnDataType::List => {
|
||||
if let Some(TypeExt::ListType(d)) = datatype_wrapper
|
||||
.datatype_ext
|
||||
.as_ref()
|
||||
.and_then(|datatype_ext| datatype_ext.type_ext.as_ref())
|
||||
{
|
||||
let item_type = ColumnDataTypeWrapper {
|
||||
datatype: d.datatype(),
|
||||
datatype_ext: d.datatype_extension.clone().map(|d| *d),
|
||||
};
|
||||
ConcreteDataType::list_datatype(item_type.into())
|
||||
} else {
|
||||
// invalid state: type extension not found
|
||||
ConcreteDataType::null_datatype()
|
||||
}
|
||||
}
|
||||
ColumnDataType::Struct => {
|
||||
if let Some(TypeExt::StructType(d)) = datatype_wrapper
|
||||
.datatype_ext
|
||||
.as_ref()
|
||||
.and_then(|datatype_ext| datatype_ext.type_ext.as_ref())
|
||||
{
|
||||
let fields = d
|
||||
.fields
|
||||
.iter()
|
||||
.map(|f| {
|
||||
let field_type = ColumnDataTypeWrapper {
|
||||
datatype: f.datatype(),
|
||||
datatype_ext: f.datatype_extension.clone(),
|
||||
};
|
||||
StructField::new(f.name.clone(), field_type.into(), true)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
ConcreteDataType::struct_datatype(StructType::from(fields))
|
||||
} else {
|
||||
// invalid state: type extension not found
|
||||
ConcreteDataType::null_datatype()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -249,6 +290,39 @@ impl ColumnDataTypeWrapper {
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a list datatype with the given item type.
|
||||
pub fn list_datatype(item_type: ColumnDataTypeWrapper) -> Self {
|
||||
ColumnDataTypeWrapper {
|
||||
datatype: ColumnDataType::List,
|
||||
datatype_ext: Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::ListType(Box::new(ListTypeExtension {
|
||||
datatype: item_type.datatype() as i32,
|
||||
datatype_extension: item_type.datatype_ext.map(Box::new),
|
||||
}))),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a struct datatype with the given field tuples (name, datatype).
|
||||
pub fn struct_datatype(fields: Vec<(String, ColumnDataTypeWrapper)>) -> Self {
|
||||
let struct_fields = fields
|
||||
.into_iter()
|
||||
.map(|(name, datatype)| greptime_proto::v1::StructField {
|
||||
name,
|
||||
datatype: datatype.datatype() as i32,
|
||||
datatype_extension: datatype.datatype_ext,
|
||||
})
|
||||
.collect();
|
||||
ColumnDataTypeWrapper {
|
||||
datatype: ColumnDataType::Struct,
|
||||
datatype_ext: Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::StructType(StructTypeExtension {
|
||||
fields: struct_fields,
|
||||
})),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
|
||||
@@ -290,9 +364,9 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
|
||||
ConcreteDataType::Decimal128(_) => ColumnDataType::Decimal128,
|
||||
ConcreteDataType::Json(_) => ColumnDataType::Json,
|
||||
ConcreteDataType::Vector(_) => ColumnDataType::Vector,
|
||||
ConcreteDataType::List(_) => ColumnDataType::List,
|
||||
ConcreteDataType::Struct(_) => ColumnDataType::Struct,
|
||||
ConcreteDataType::Null(_)
|
||||
| ConcreteDataType::List(_)
|
||||
| ConcreteDataType::Struct(_)
|
||||
| ConcreteDataType::Dictionary(_)
|
||||
| ConcreteDataType::Duration(_) => {
|
||||
return error::IntoColumnDataTypeSnafu { from: datatype }.fail();
|
||||
@@ -321,6 +395,40 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
|
||||
})),
|
||||
})
|
||||
}
|
||||
ColumnDataType::List => {
|
||||
if let Some(list_type) = datatype.as_list() {
|
||||
let list_item_type =
|
||||
ColumnDataTypeWrapper::try_from(list_type.item_type().clone())?;
|
||||
Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::ListType(Box::new(ListTypeExtension {
|
||||
datatype: list_item_type.datatype.into(),
|
||||
datatype_extension: list_item_type.datatype_ext.map(Box::new),
|
||||
}))),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
ColumnDataType::Struct => {
|
||||
if let Some(struct_type) = datatype.as_struct() {
|
||||
let mut fields = Vec::with_capacity(struct_type.fields().len());
|
||||
for field in struct_type.fields() {
|
||||
let field_type =
|
||||
ColumnDataTypeWrapper::try_from(field.data_type().clone())?;
|
||||
let proto_field = crate::v1::StructField {
|
||||
name: field.name().to_string(),
|
||||
datatype: field_type.datatype.into(),
|
||||
datatype_extension: field_type.datatype_ext,
|
||||
};
|
||||
fields.push(proto_field);
|
||||
}
|
||||
Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::StructType(StructTypeExtension { fields })),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
Ok(Self {
|
||||
@@ -448,56 +556,17 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values
|
||||
binary_values: Vec::with_capacity(capacity),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnDataType::List => Values {
|
||||
list_values: Vec::with_capacity(capacity),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnDataType::Struct => Values {
|
||||
struct_values: Vec::with_capacity(capacity),
|
||||
..Default::default()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// The type of vals must be same.
|
||||
pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) {
|
||||
let values = column.values.get_or_insert_with(Values::default);
|
||||
let mut null_mask = BitVec::from_slice(&column.null_mask);
|
||||
let len = vector.len();
|
||||
null_mask.reserve_exact(origin_count + len);
|
||||
null_mask.extend(BitVec::repeat(false, len));
|
||||
|
||||
(0..len).for_each(|idx| match vector.get(idx) {
|
||||
Value::Null => null_mask.set(idx + origin_count, true),
|
||||
Value::Boolean(val) => values.bool_values.push(val),
|
||||
Value::UInt8(val) => values.u8_values.push(val.into()),
|
||||
Value::UInt16(val) => values.u16_values.push(val.into()),
|
||||
Value::UInt32(val) => values.u32_values.push(val),
|
||||
Value::UInt64(val) => values.u64_values.push(val),
|
||||
Value::Int8(val) => values.i8_values.push(val.into()),
|
||||
Value::Int16(val) => values.i16_values.push(val.into()),
|
||||
Value::Int32(val) => values.i32_values.push(val),
|
||||
Value::Int64(val) => values.i64_values.push(val),
|
||||
Value::Float32(val) => values.f32_values.push(*val),
|
||||
Value::Float64(val) => values.f64_values.push(*val),
|
||||
Value::String(val) => values.string_values.push(val.as_utf8().to_string()),
|
||||
Value::Binary(val) => values.binary_values.push(val.to_vec()),
|
||||
Value::Date(val) => values.date_values.push(val.val()),
|
||||
Value::Timestamp(val) => match val.unit() {
|
||||
TimeUnit::Second => values.timestamp_second_values.push(val.value()),
|
||||
TimeUnit::Millisecond => values.timestamp_millisecond_values.push(val.value()),
|
||||
TimeUnit::Microsecond => values.timestamp_microsecond_values.push(val.value()),
|
||||
TimeUnit::Nanosecond => values.timestamp_nanosecond_values.push(val.value()),
|
||||
},
|
||||
Value::Time(val) => match val.unit() {
|
||||
TimeUnit::Second => values.time_second_values.push(val.value()),
|
||||
TimeUnit::Millisecond => values.time_millisecond_values.push(val.value()),
|
||||
TimeUnit::Microsecond => values.time_microsecond_values.push(val.value()),
|
||||
TimeUnit::Nanosecond => values.time_nanosecond_values.push(val.value()),
|
||||
},
|
||||
Value::IntervalYearMonth(val) => values.interval_year_month_values.push(val.to_i32()),
|
||||
Value::IntervalDayTime(val) => values.interval_day_time_values.push(val.to_i64()),
|
||||
Value::IntervalMonthDayNano(val) => values
|
||||
.interval_month_day_nano_values
|
||||
.push(convert_month_day_nano_to_pb(val)),
|
||||
Value::Decimal128(val) => values.decimal128_values.push(convert_to_pb_decimal128(val)),
|
||||
Value::List(_) | Value::Duration(_) => unreachable!(),
|
||||
});
|
||||
column.null_mask = null_mask.into_vec();
|
||||
}
|
||||
|
||||
/// Returns the type name of the [Request].
|
||||
pub fn request_type(request: &Request) -> &'static str {
|
||||
match request {
|
||||
@@ -555,7 +624,7 @@ pub fn convert_to_pb_decimal128(v: Decimal128) -> v1::Decimal128 {
|
||||
|
||||
pub fn pb_value_to_value_ref<'a>(
|
||||
value: &'a v1::Value,
|
||||
datatype_ext: &'a Option<ColumnDataTypeExtension>,
|
||||
datatype_ext: Option<&'a ColumnDataTypeExtension>,
|
||||
) -> ValueRef<'a> {
|
||||
let Some(value) = &value.value_data else {
|
||||
return ValueRef::Null;
|
||||
@@ -622,6 +691,77 @@ pub fn pb_value_to_value_ref<'a>(
|
||||
))
|
||||
}
|
||||
}
|
||||
ValueData::ListValue(list) => {
|
||||
let list_datatype_ext = datatype_ext
|
||||
.as_ref()
|
||||
.and_then(|ext| {
|
||||
if let Some(TypeExt::ListType(l)) = &ext.type_ext {
|
||||
Some(l)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.expect("list must contain datatype ext");
|
||||
let item_type = ConcreteDataType::from(ColumnDataTypeWrapper::new(
|
||||
list_datatype_ext.datatype(),
|
||||
list_datatype_ext
|
||||
.datatype_extension
|
||||
.as_ref()
|
||||
.map(|ext| *ext.clone()),
|
||||
));
|
||||
let items = list
|
||||
.items
|
||||
.iter()
|
||||
.map(|item| {
|
||||
pb_value_to_value_ref(item, list_datatype_ext.datatype_extension.as_deref())
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let list_value = ListValueRef::RefList {
|
||||
val: items,
|
||||
item_datatype: item_type.clone(),
|
||||
};
|
||||
ValueRef::List(list_value)
|
||||
}
|
||||
|
||||
ValueData::StructValue(struct_value) => {
|
||||
let struct_datatype_ext = datatype_ext
|
||||
.as_ref()
|
||||
.and_then(|ext| {
|
||||
if let Some(TypeExt::StructType(s)) = &ext.type_ext {
|
||||
Some(s)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.expect("struct must contain datatype ext");
|
||||
|
||||
let struct_fields = struct_datatype_ext
|
||||
.fields
|
||||
.iter()
|
||||
.map(|field| {
|
||||
let field_type = ConcreteDataType::from(ColumnDataTypeWrapper::new(
|
||||
field.datatype(),
|
||||
field.datatype_extension.clone(),
|
||||
));
|
||||
let field_name = field.name.clone();
|
||||
StructField::new(field_name, field_type, true)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let items = struct_value
|
||||
.items
|
||||
.iter()
|
||||
.zip(struct_datatype_ext.fields.iter())
|
||||
.map(|(item, field)| pb_value_to_value_ref(item, field.datatype_extension.as_ref()))
|
||||
.collect::<Vec<ValueRef>>();
|
||||
|
||||
let struct_value_ref = StructValueRef::RefList {
|
||||
val: items,
|
||||
fields: StructType::new(struct_fields),
|
||||
};
|
||||
ValueRef::Struct(struct_value_ref)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -896,8 +1036,8 @@ pub fn is_column_type_value_eq(
|
||||
}
|
||||
|
||||
/// Convert value into proto's value.
|
||||
pub fn to_proto_value(value: Value) -> Option<v1::Value> {
|
||||
let proto_value = match value {
|
||||
pub fn to_proto_value(value: Value) -> v1::Value {
|
||||
match value {
|
||||
Value::Null => v1::Value { value_data: None },
|
||||
Value::Boolean(v) => v1::Value {
|
||||
value_data: Some(ValueData::BoolValue(v)),
|
||||
@@ -983,10 +1123,34 @@ pub fn to_proto_value(value: Value) -> Option<v1::Value> {
|
||||
Value::Decimal128(v) => v1::Value {
|
||||
value_data: Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
|
||||
},
|
||||
Value::List(_) | Value::Duration(_) => return None,
|
||||
};
|
||||
Value::List(list_value) => v1::Value {
|
||||
value_data: Some(ValueData::ListValue(v1::ListValue {
|
||||
items: convert_list_to_pb_values(list_value),
|
||||
})),
|
||||
},
|
||||
Value::Struct(struct_value) => v1::Value {
|
||||
value_data: Some(ValueData::StructValue(v1::StructValue {
|
||||
items: convert_struct_to_pb_values(struct_value),
|
||||
})),
|
||||
},
|
||||
Value::Duration(_) => v1::Value { value_data: None },
|
||||
}
|
||||
}
|
||||
|
||||
Some(proto_value)
|
||||
fn convert_list_to_pb_values(list_value: ListValue) -> Vec<v1::Value> {
|
||||
list_value
|
||||
.take_items()
|
||||
.into_iter()
|
||||
.map(to_proto_value)
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn convert_struct_to_pb_values(struct_value: StructValue) -> Vec<v1::Value> {
|
||||
struct_value
|
||||
.take_items()
|
||||
.into_iter()
|
||||
.map(to_proto_value)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns the [ColumnDataTypeWrapper] of the value.
|
||||
@@ -1021,6 +1185,8 @@ pub fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
|
||||
ValueData::IntervalDayTimeValue(_) => ColumnDataType::IntervalDayTime,
|
||||
ValueData::IntervalMonthDayNanoValue(_) => ColumnDataType::IntervalMonthDayNano,
|
||||
ValueData::Decimal128Value(_) => ColumnDataType::Decimal128,
|
||||
ValueData::ListValue(_) => ColumnDataType::List,
|
||||
ValueData::StructValue(_) => ColumnDataType::Struct,
|
||||
};
|
||||
Some(value_type)
|
||||
}
|
||||
@@ -1075,7 +1241,23 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
|
||||
convert_month_day_nano_to_pb(v),
|
||||
)),
|
||||
Value::Decimal128(v) => Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
|
||||
Value::List(_) | Value::Duration(_) => unreachable!(),
|
||||
Value::List(list_value) => {
|
||||
let items = list_value
|
||||
.take_items()
|
||||
.into_iter()
|
||||
.map(value_to_grpc_value)
|
||||
.collect();
|
||||
Some(ValueData::ListValue(v1::ListValue { items }))
|
||||
}
|
||||
Value::Struct(struct_value) => {
|
||||
let items = struct_value
|
||||
.take_items()
|
||||
.into_iter()
|
||||
.map(value_to_grpc_value)
|
||||
.collect();
|
||||
Some(ValueData::StructValue(v1::StructValue { items }))
|
||||
}
|
||||
Value::Duration(_) => unreachable!(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -1173,15 +1355,11 @@ mod tests {
|
||||
TimeMillisecondType, TimeSecondType, TimestampMillisecondType, TimestampSecondType,
|
||||
UInt32Type,
|
||||
};
|
||||
use datatypes::vectors::{
|
||||
BooleanVector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector,
|
||||
TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector,
|
||||
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
|
||||
TimestampSecondVector, Vector,
|
||||
};
|
||||
use datatypes::vectors::BooleanVector;
|
||||
use paste::paste;
|
||||
|
||||
use super::*;
|
||||
use crate::v1::Column;
|
||||
|
||||
#[test]
|
||||
fn test_values_with_capacity() {
|
||||
@@ -1260,6 +1438,14 @@ mod tests {
|
||||
let values = values_with_capacity(ColumnDataType::Vector, 2);
|
||||
let values = values.binary_values;
|
||||
assert_eq!(2, values.capacity());
|
||||
|
||||
let values = values_with_capacity(ColumnDataType::List, 2);
|
||||
let values = values.list_values;
|
||||
assert_eq!(2, values.capacity());
|
||||
|
||||
let values = values_with_capacity(ColumnDataType::Struct, 2);
|
||||
let values = values.struct_values;
|
||||
assert_eq!(2, values.capacity());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1352,6 +1538,37 @@ mod tests {
|
||||
ConcreteDataType::vector_datatype(3),
|
||||
ColumnDataTypeWrapper::vector_datatype(3).into()
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
|
||||
ColumnDataTypeWrapper::list_datatype(ColumnDataTypeWrapper::string_datatype()).into()
|
||||
);
|
||||
let struct_type = StructType::new(vec![
|
||||
StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true),
|
||||
StructField::new(
|
||||
"name".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
StructField::new("age".to_string(), ConcreteDataType::int32_datatype(), true),
|
||||
StructField::new(
|
||||
"address".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
]);
|
||||
assert_eq!(
|
||||
ConcreteDataType::struct_datatype(struct_type.clone()),
|
||||
ColumnDataTypeWrapper::struct_datatype(vec![
|
||||
("id".to_string(), ColumnDataTypeWrapper::int64_datatype()),
|
||||
("name".to_string(), ColumnDataTypeWrapper::string_datatype()),
|
||||
("age".to_string(), ColumnDataTypeWrapper::int32_datatype()),
|
||||
(
|
||||
"address".to_string(),
|
||||
ColumnDataTypeWrapper::string_datatype()
|
||||
)
|
||||
])
|
||||
.into()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1455,176 +1672,29 @@ mod tests {
|
||||
"Failed to create column datatype from Null(NullType)"
|
||||
);
|
||||
|
||||
let result: Result<ColumnDataTypeWrapper> =
|
||||
ConcreteDataType::list_datatype(ConcreteDataType::boolean_datatype()).try_into();
|
||||
assert!(result.is_err());
|
||||
assert_eq!(
|
||||
result.unwrap_err().to_string(),
|
||||
"Failed to create column datatype from List(ListType { item_type: Boolean(BooleanType) })"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_put_timestamp_values() {
|
||||
let mut column = Column {
|
||||
column_name: "test".to_string(),
|
||||
semantic_type: 0,
|
||||
values: Some(Values {
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![],
|
||||
datatype: 0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let vector = Arc::new(TimestampNanosecondVector::from_vec(vec![1, 2, 3]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![1, 2, 3],
|
||||
column.values.as_ref().unwrap().timestamp_nanosecond_values
|
||||
ColumnDataTypeWrapper::list_datatype(ColumnDataTypeWrapper::int16_datatype()),
|
||||
ConcreteDataType::list_datatype(ConcreteDataType::int16_datatype())
|
||||
.try_into()
|
||||
.expect("Failed to create column datatype from List(ListType { item_type: Int16(Int16Type) })")
|
||||
);
|
||||
|
||||
let vector = Arc::new(TimestampMillisecondVector::from_vec(vec![4, 5, 6]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![4, 5, 6],
|
||||
column.values.as_ref().unwrap().timestamp_millisecond_values
|
||||
);
|
||||
|
||||
let vector = Arc::new(TimestampMicrosecondVector::from_vec(vec![7, 8, 9]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![7, 8, 9],
|
||||
column.values.as_ref().unwrap().timestamp_microsecond_values
|
||||
);
|
||||
|
||||
let vector = Arc::new(TimestampSecondVector::from_vec(vec![10, 11, 12]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![10, 11, 12],
|
||||
column.values.as_ref().unwrap().timestamp_second_values
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_put_time_values() {
|
||||
let mut column = Column {
|
||||
column_name: "test".to_string(),
|
||||
semantic_type: 0,
|
||||
values: Some(Values {
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![],
|
||||
datatype: 0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let vector = Arc::new(TimeNanosecondVector::from_vec(vec![1, 2, 3]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![1, 2, 3],
|
||||
column.values.as_ref().unwrap().time_nanosecond_values
|
||||
);
|
||||
|
||||
let vector = Arc::new(TimeMillisecondVector::from_vec(vec![4, 5, 6]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![4, 5, 6],
|
||||
column.values.as_ref().unwrap().time_millisecond_values
|
||||
);
|
||||
|
||||
let vector = Arc::new(TimeMicrosecondVector::from_vec(vec![7, 8, 9]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![7, 8, 9],
|
||||
column.values.as_ref().unwrap().time_microsecond_values
|
||||
);
|
||||
|
||||
let vector = Arc::new(TimeSecondVector::from_vec(vec![10, 11, 12]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![10, 11, 12],
|
||||
column.values.as_ref().unwrap().time_second_values
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_put_interval_values() {
|
||||
let mut column = Column {
|
||||
column_name: "test".to_string(),
|
||||
semantic_type: 0,
|
||||
values: Some(Values {
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![],
|
||||
datatype: 0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let vector = Arc::new(IntervalYearMonthVector::from_vec(vec![1, 2, 3]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![1, 2, 3],
|
||||
column.values.as_ref().unwrap().interval_year_month_values
|
||||
);
|
||||
|
||||
let vector = Arc::new(IntervalDayTimeVector::from_vec(vec![
|
||||
IntervalDayTime::new(0, 4).into(),
|
||||
IntervalDayTime::new(0, 5).into(),
|
||||
IntervalDayTime::new(0, 6).into(),
|
||||
]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![4, 5, 6],
|
||||
column.values.as_ref().unwrap().interval_day_time_values
|
||||
);
|
||||
|
||||
let vector = Arc::new(IntervalMonthDayNanoVector::from_vec(vec![
|
||||
IntervalMonthDayNano::new(0, 0, 7).into(),
|
||||
IntervalMonthDayNano::new(0, 0, 8).into(),
|
||||
IntervalMonthDayNano::new(0, 0, 9).into(),
|
||||
]));
|
||||
let len = vector.len();
|
||||
push_vals(&mut column, 3, vector);
|
||||
(0..len).for_each(|i| {
|
||||
assert_eq!(
|
||||
7 + i as i64,
|
||||
column
|
||||
.values
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.interval_month_day_nano_values
|
||||
.get(i)
|
||||
.unwrap()
|
||||
.nanoseconds
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_put_vector() {
|
||||
use crate::v1::SemanticType;
|
||||
// Some(false), None, Some(true), Some(true)
|
||||
let mut column = Column {
|
||||
column_name: "test".to_string(),
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
values: Some(Values {
|
||||
bool_values: vec![false, true, true],
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![2],
|
||||
datatype: ColumnDataType::Boolean as i32,
|
||||
..Default::default()
|
||||
};
|
||||
let row_count = 4;
|
||||
|
||||
let vector = Arc::new(BooleanVector::from(vec![Some(true), None, Some(false)]));
|
||||
push_vals(&mut column, row_count, vector);
|
||||
// Some(false), None, Some(true), Some(true), Some(true), None, Some(false)
|
||||
let bool_values = column.values.unwrap().bool_values;
|
||||
assert_eq!(vec![false, true, true, true, false], bool_values);
|
||||
let null_mask = column.null_mask;
|
||||
assert_eq!(34, null_mask[0]);
|
||||
ColumnDataTypeWrapper::struct_datatype(vec![
|
||||
("a".to_string(), ColumnDataTypeWrapper::int64_datatype()),
|
||||
(
|
||||
"a.a".to_string(),
|
||||
ColumnDataTypeWrapper::list_datatype(ColumnDataTypeWrapper::string_datatype())
|
||||
)
|
||||
]),
|
||||
ConcreteDataType::struct_datatype(StructType::new(vec![
|
||||
StructField::new("a".to_string(), ConcreteDataType::int64_datatype(), true),
|
||||
StructField::new(
|
||||
"a.a".to_string(),
|
||||
ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()), true
|
||||
)
|
||||
])).try_into().expect("Failed to create column datatype from Struct(StructType { fields: [StructField { name: \"a\", data_type: Int64(Int64Type) }, StructField { name: \"a.a\", data_type: List(ListType { item_type: String(StringType) }) }] })")
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -2000,4 +2070,50 @@ mod tests {
|
||||
assert_eq!(pb_decimal.lo, 123);
|
||||
assert_eq!(pb_decimal.hi, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_list_to_pb_value() {
|
||||
let value = Value::List(ListValue::new(
|
||||
vec![Value::Boolean(true)],
|
||||
ConcreteDataType::boolean_datatype(),
|
||||
));
|
||||
|
||||
let pb_value = to_proto_value(value);
|
||||
|
||||
match pb_value.value_data.unwrap() {
|
||||
ValueData::ListValue(pb_list_value) => {
|
||||
assert_eq!(pb_list_value.items.len(), 1);
|
||||
}
|
||||
_ => panic!("Unexpected value type"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_struct_to_pb_value() {
|
||||
let items = vec![Value::Boolean(true), Value::String("tom".into())];
|
||||
|
||||
let value = Value::Struct(
|
||||
StructValue::try_new(
|
||||
items,
|
||||
StructType::new(vec![
|
||||
StructField::new(
|
||||
"a.a".to_string(),
|
||||
ConcreteDataType::boolean_datatype(),
|
||||
true,
|
||||
),
|
||||
StructField::new("a.b".to_string(), ConcreteDataType::string_datatype(), true),
|
||||
]),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let pb_value = to_proto_value(value);
|
||||
|
||||
match pb_value.value_data.unwrap() {
|
||||
ValueData::StructValue(pb_struct_value) => {
|
||||
assert_eq!(pb_struct_value.items.len(), 2);
|
||||
}
|
||||
_ => panic!("Unexpected value type"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,8 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(let_chains)]
|
||||
|
||||
pub mod error;
|
||||
pub mod helper;
|
||||
|
||||
|
||||
@@ -37,8 +37,10 @@ const SKIPPING_INDEX_GRPC_KEY: &str = "skipping_index";
|
||||
|
||||
/// Tries to construct a `ColumnSchema` from the given `ColumnDef`.
|
||||
pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
|
||||
let data_type =
|
||||
ColumnDataTypeWrapper::try_new(column_def.data_type, column_def.datatype_extension)?;
|
||||
let data_type = ColumnDataTypeWrapper::try_new(
|
||||
column_def.data_type,
|
||||
column_def.datatype_extension.clone(),
|
||||
)?;
|
||||
|
||||
let constraint = if column_def.default_constraint.is_empty() {
|
||||
None
|
||||
|
||||
@@ -35,7 +35,7 @@ pub fn userinfo_by_name(username: Option<String>) -> UserInfoRef {
|
||||
DefaultUserInfo::with_name(username.unwrap_or_else(|| DEFAULT_USERNAME.to_string()))
|
||||
}
|
||||
|
||||
pub fn user_provider_from_option(opt: &String) -> Result<UserProviderRef> {
|
||||
pub fn user_provider_from_option(opt: &str) -> Result<UserProviderRef> {
|
||||
let (name, content) = opt.split_once(':').with_context(|| InvalidConfigSnafu {
|
||||
value: opt.to_string(),
|
||||
msg: "UserProviderOption must be in format `<option>:<value>`",
|
||||
@@ -57,7 +57,7 @@ pub fn user_provider_from_option(opt: &String) -> Result<UserProviderRef> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn static_user_provider_from_option(opt: &String) -> Result<StaticUserProvider> {
|
||||
pub fn static_user_provider_from_option(opt: &str) -> Result<StaticUserProvider> {
|
||||
let (name, content) = opt.split_once(':').with_context(|| InvalidConfigSnafu {
|
||||
value: opt.to_string(),
|
||||
msg: "UserProviderOption must be in format `<option>:<value>`",
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
#![feature(assert_matches)]
|
||||
#![feature(try_blocks)]
|
||||
#![feature(let_chains)]
|
||||
|
||||
use std::any::Any;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
|
||||
@@ -392,15 +392,15 @@ impl MemoryCatalogManager {
|
||||
if !manager.schema_exist_sync(catalog, schema).unwrap() {
|
||||
manager
|
||||
.register_schema_sync(RegisterSchemaRequest {
|
||||
catalog: catalog.to_string(),
|
||||
schema: schema.to_string(),
|
||||
catalog: catalog.clone(),
|
||||
schema: schema.clone(),
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let request = RegisterTableRequest {
|
||||
catalog: catalog.to_string(),
|
||||
schema: schema.to_string(),
|
||||
catalog: catalog.clone(),
|
||||
schema: schema.clone(),
|
||||
table_name: table.table_info().name.clone(),
|
||||
table_id: table.table_info().ident.table_id,
|
||||
table,
|
||||
|
||||
@@ -56,14 +56,21 @@ pub struct ProcessManager {
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum QueryStatement {
|
||||
Sql(Statement),
|
||||
Promql(EvalStmt),
|
||||
// The optional string is the alias of the PromQL query.
|
||||
Promql(EvalStmt, Option<String>),
|
||||
}
|
||||
|
||||
impl Display for QueryStatement {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
QueryStatement::Sql(stmt) => write!(f, "{}", stmt),
|
||||
QueryStatement::Promql(eval_stmt) => write!(f, "{}", eval_stmt),
|
||||
QueryStatement::Promql(eval_stmt, alias) => {
|
||||
if let Some(alias) = alias {
|
||||
write!(f, "{} AS {}", eval_stmt, alias)
|
||||
} else {
|
||||
write!(f, "{}", eval_stmt)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -338,9 +345,9 @@ impl SlowQueryTimer {
|
||||
};
|
||||
|
||||
match &self.stmt {
|
||||
QueryStatement::Promql(stmt) => {
|
||||
QueryStatement::Promql(stmt, _alias) => {
|
||||
slow_query_event.is_promql = true;
|
||||
slow_query_event.query = stmt.expr.to_string();
|
||||
slow_query_event.query = self.stmt.to_string();
|
||||
slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64);
|
||||
|
||||
let start = stmt
|
||||
|
||||
@@ -362,7 +362,7 @@ impl InformationSchemaProvider {
|
||||
}
|
||||
#[cfg(feature = "enterprise")]
|
||||
for name in self.extra_table_factories.keys() {
|
||||
tables.insert(name.to_string(), self.build_table(name).expect(name));
|
||||
tables.insert(name.clone(), self.build_table(name).expect(name));
|
||||
}
|
||||
// Add memory tables
|
||||
for name in MEMORY_TABLES.iter() {
|
||||
|
||||
@@ -254,9 +254,9 @@ impl InformationSchemaFlowsBuilder {
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(InternalSnafu)?
|
||||
.context(FlowInfoNotFoundSnafu {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
flow_name: flow_name.to_string(),
|
||||
.with_context(|| FlowInfoNotFoundSnafu {
|
||||
catalog_name: catalog_name.clone(),
|
||||
flow_name: flow_name.clone(),
|
||||
})?;
|
||||
self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)
|
||||
.await?;
|
||||
@@ -273,11 +273,11 @@ impl InformationSchemaFlowsBuilder {
|
||||
flow_stat: &Option<FlowStat>,
|
||||
) -> Result<()> {
|
||||
let row = [
|
||||
(FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
|
||||
(FLOW_NAME, &Value::from(flow_info.flow_name().clone())),
|
||||
(FLOW_ID, &Value::from(flow_id)),
|
||||
(
|
||||
TABLE_CATALOG,
|
||||
&Value::from(flow_info.catalog_name().to_string()),
|
||||
&Value::from(flow_info.catalog_name().clone()),
|
||||
),
|
||||
];
|
||||
if !predicates.eval(&row) {
|
||||
|
||||
@@ -135,7 +135,7 @@ async fn make_process_list(
|
||||
|
||||
for process in queries {
|
||||
let display_id = DisplayProcessId {
|
||||
server_addr: process.frontend.to_string(),
|
||||
server_addr: process.frontend.clone(),
|
||||
id: process.id,
|
||||
}
|
||||
.to_string();
|
||||
|
||||
@@ -199,10 +199,7 @@ impl InformationSchemaRegionPeersBuilder {
|
||||
if table_info.table_type == TableType::Temporary {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some((
|
||||
table_info.ident.table_id,
|
||||
table_info.name.to_string(),
|
||||
)))
|
||||
Ok(Some((table_info.ident.table_id, table_info.name.clone())))
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -166,7 +166,7 @@ impl CatalogInfo for CatalogManagerWrapper {
|
||||
.await
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))
|
||||
} else {
|
||||
Ok(vec![self.catalog_name.to_string()])
|
||||
Ok(vec![self.catalog_name.clone()])
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -201,7 +201,7 @@ impl DfTableSourceProvider {
|
||||
|
||||
Ok(Arc::new(ViewTable::new(
|
||||
logical_plan,
|
||||
Some(view_info.definition.to_string()),
|
||||
Some(view_info.definition.clone()),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ impl DelKeyCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kv_backend = self.store.build().await?;
|
||||
Ok(Box::new(DelKeyTool {
|
||||
key: self.key.to_string(),
|
||||
key: self.key.clone(),
|
||||
prefix: self.prefix,
|
||||
key_deleter: KeyDeleter::new(kv_backend),
|
||||
}))
|
||||
|
||||
@@ -138,13 +138,7 @@ impl RepairTool {
|
||||
|
||||
let table_names = table_names
|
||||
.iter()
|
||||
.map(|table_name| {
|
||||
(
|
||||
catalog.to_string(),
|
||||
schema_name.to_string(),
|
||||
table_name.to_string(),
|
||||
)
|
||||
})
|
||||
.map(|table_name| (catalog.clone(), schema_name.clone(), table_name.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
return Ok(IteratorInput::new_table_names(table_names));
|
||||
} else if !self.table_ids.is_empty() {
|
||||
|
||||
@@ -32,9 +32,9 @@ pub fn generate_alter_table_expr_for_all_columns(
|
||||
let schema = &table_info.meta.schema;
|
||||
|
||||
let mut alter_table_expr = AlterTableExpr {
|
||||
catalog_name: table_info.catalog_name.to_string(),
|
||||
schema_name: table_info.schema_name.to_string(),
|
||||
table_name: table_info.name.to_string(),
|
||||
catalog_name: table_info.catalog_name.clone(),
|
||||
schema_name: table_info.schema_name.clone(),
|
||||
table_name: table_info.name.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -44,9 +44,9 @@ pub fn generate_create_table_expr(table_info: &RawTableInfo) -> Result<CreateTab
|
||||
let table_options = HashMap::from(&table_info.meta.options);
|
||||
|
||||
Ok(CreateTableExpr {
|
||||
catalog_name: table_info.catalog_name.to_string(),
|
||||
schema_name: table_info.schema_name.to_string(),
|
||||
table_name: table_info.name.to_string(),
|
||||
catalog_name: table_info.catalog_name.clone(),
|
||||
schema_name: table_info.schema_name.clone(),
|
||||
table_name: table_info.name.clone(),
|
||||
desc: String::default(),
|
||||
column_defs,
|
||||
time_index,
|
||||
@@ -54,7 +54,7 @@ pub fn generate_create_table_expr(table_info: &RawTableInfo) -> Result<CreateTab
|
||||
create_if_not_exists: true,
|
||||
table_options,
|
||||
table_id: None,
|
||||
engine: table_info.meta.engine.to_string(),
|
||||
engine: table_info.meta.engine.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use common_error::define_from_tonic_status;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use snafu::{Location, Snafu, location};
|
||||
use snafu::{Location, Snafu};
|
||||
use tonic::Code;
|
||||
use tonic::metadata::errors::InvalidMetadataValue;
|
||||
|
||||
|
||||
@@ -252,10 +252,10 @@ impl StartCommand {
|
||||
|
||||
if let Some(addr) = &self.internal_rpc_bind_addr {
|
||||
if let Some(internal_grpc) = &mut opts.internal_grpc {
|
||||
internal_grpc.bind_addr = addr.to_string();
|
||||
internal_grpc.bind_addr = addr.clone();
|
||||
} else {
|
||||
let grpc_options = GrpcOptions {
|
||||
bind_addr: addr.to_string(),
|
||||
bind_addr: addr.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -265,10 +265,10 @@ impl StartCommand {
|
||||
|
||||
if let Some(addr) = &self.internal_rpc_server_addr {
|
||||
if let Some(internal_grpc) = &mut opts.internal_grpc {
|
||||
internal_grpc.server_addr = addr.to_string();
|
||||
internal_grpc.server_addr = addr.clone();
|
||||
} else {
|
||||
let grpc_options = GrpcOptions {
|
||||
server_addr: addr.to_string(),
|
||||
server_addr: addr.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
opts.internal_grpc = Some(grpc_options);
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(assert_matches, let_chains)]
|
||||
#![feature(assert_matches)]
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::ErrorExt;
|
||||
|
||||
@@ -75,11 +75,11 @@ impl Plugins {
|
||||
self.read().is_empty()
|
||||
}
|
||||
|
||||
fn read(&self) -> RwLockReadGuard<SendSyncAnyMap> {
|
||||
fn read(&self) -> RwLockReadGuard<'_, SendSyncAnyMap> {
|
||||
self.inner.read().unwrap()
|
||||
}
|
||||
|
||||
fn write(&self) -> RwLockWriteGuard<SendSyncAnyMap> {
|
||||
fn write(&self) -> RwLockWriteGuard<'_, SendSyncAnyMap> {
|
||||
self.inner.write().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -150,7 +150,7 @@ impl<
|
||||
if let Some(ref mut writer) = self.writer {
|
||||
Ok(writer)
|
||||
} else {
|
||||
let writer = (self.writer_factory)(self.path.to_string()).await?;
|
||||
let writer = (self.writer_factory)(self.path.clone()).await?;
|
||||
Ok(self.writer.insert(writer))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ pub(crate) fn scan_config(
|
||||
) -> FileScanConfig {
|
||||
// object_store only recognize the Unix style path, so make it happy.
|
||||
let filename = &filename.replace('\\', "/");
|
||||
let file_group = FileGroup::new(vec![PartitionedFile::new(filename.to_string(), 4096)]);
|
||||
let file_group = FileGroup::new(vec![PartitionedFile::new(filename.clone(), 4096)]);
|
||||
|
||||
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source)
|
||||
.with_file_group(file_group)
|
||||
|
||||
@@ -71,7 +71,7 @@ impl FunctionRegistry {
|
||||
for alias in func.aliases() {
|
||||
let func: ScalarFunctionFactory = func.clone().into();
|
||||
let alias = ScalarFunctionFactory {
|
||||
name: alias.to_string(),
|
||||
name: alias.clone(),
|
||||
..func
|
||||
};
|
||||
self.register(alias);
|
||||
|
||||
@@ -38,7 +38,7 @@ pub(crate) fn one_of_sigs2(args1: Vec<DataType>, args2: Vec<DataType>) -> Signat
|
||||
|
||||
/// Cast a [`ValueRef`] to u64, returns `None` if fails
|
||||
pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
|
||||
cast((*value).into(), &ConcreteDataType::uint64_datatype())
|
||||
cast(value.clone().into(), &ConcreteDataType::uint64_datatype())
|
||||
.context(InvalidInputTypeSnafu {
|
||||
err_msg: format!(
|
||||
"Failed to cast input into uint64, actual type: {:#?}",
|
||||
@@ -50,7 +50,7 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
|
||||
|
||||
/// Cast a [`ValueRef`] to u32, returns `None` if fails
|
||||
pub fn cast_u32(value: &ValueRef) -> Result<Option<u32>> {
|
||||
cast((*value).into(), &ConcreteDataType::uint32_datatype())
|
||||
cast(value.clone().into(), &ConcreteDataType::uint32_datatype())
|
||||
.context(InvalidInputTypeSnafu {
|
||||
err_msg: format!(
|
||||
"Failed to cast input into uint32, actual type: {:#?}",
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(let_chains)]
|
||||
#![feature(try_blocks)]
|
||||
#![feature(assert_matches)]
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ where
|
||||
let right: &<R as Scalar>::VectorType = unsafe { Helper::static_cast(right.inner()) };
|
||||
let b = right.get_data(0);
|
||||
|
||||
let it = left.iter_data().map(|a| f(a, b, ctx));
|
||||
let it = left.iter_data().map(|a| f(a, b.clone(), ctx));
|
||||
<O as Scalar>::VectorType::from_owned_iterator(it)
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ where
|
||||
let a = left.get_data(0);
|
||||
|
||||
let right: &<R as Scalar>::VectorType = unsafe { Helper::static_cast(r) };
|
||||
let it = right.iter_data().map(|b| f(a, b, ctx));
|
||||
let it = right.iter_data().map(|b| f(a.clone(), b, ctx));
|
||||
<O as Scalar>::VectorType::from_owned_iterator(it)
|
||||
}
|
||||
|
||||
|
||||
@@ -309,7 +309,7 @@ fn is_ipv6_in_range(ip: &Ipv6Addr, cidr_base: &Ipv6Addr, prefix_len: u8) -> Opti
|
||||
}
|
||||
|
||||
// If there's a partial byte to check
|
||||
if prefix_len % 8 != 0 && full_bytes < 16 {
|
||||
if !prefix_len.is_multiple_of(8) && full_bytes < 16 {
|
||||
let bits_to_check = prefix_len % 8;
|
||||
let mask = 0xFF_u8 << (8 - bits_to_check);
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ pub fn as_veclit(arg: &ScalarValue) -> Result<Option<Cow<'_, [f32]>>> {
|
||||
|
||||
/// Convert a u8 slice to a vector literal.
|
||||
pub fn binlit_as_veclit(bytes: &[u8]) -> Result<Cow<'_, [f32]>> {
|
||||
if bytes.len() % std::mem::size_of::<f32>() != 0 {
|
||||
if !bytes.len().is_multiple_of(size_of::<f32>()) {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!("Invalid binary length of vector: {}", bytes.len()),
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ pub(crate) fn add_values_to_builder(
|
||||
Some(true) => builder.push_null(),
|
||||
_ => {
|
||||
builder
|
||||
.try_push_value_ref(values[idx_of_values].as_value_ref())
|
||||
.try_push_value_ref(&values[idx_of_values].as_value_ref())
|
||||
.context(CreateVectorSnafu)?;
|
||||
idx_of_values += 1
|
||||
}
|
||||
|
||||
@@ -167,7 +167,7 @@ pub fn build_create_table_expr(
|
||||
default_constraint: vec![],
|
||||
semantic_type,
|
||||
comment: String::new(),
|
||||
datatype_extension: *datatype_extension,
|
||||
datatype_extension: datatype_extension.clone(),
|
||||
options: options.clone(),
|
||||
});
|
||||
}
|
||||
@@ -209,7 +209,7 @@ pub fn extract_new_columns(
|
||||
default_constraint: vec![],
|
||||
semantic_type: expr.semantic_type,
|
||||
comment: String::new(),
|
||||
datatype_extension: *expr.datatype_extension,
|
||||
datatype_extension: expr.datatype_extension.clone(),
|
||||
options: expr.options.clone(),
|
||||
});
|
||||
AddColumn {
|
||||
@@ -425,7 +425,7 @@ mod tests {
|
||||
ConcreteDataType::from(
|
||||
ColumnDataTypeWrapper::try_new(
|
||||
decimal_column.data_type,
|
||||
decimal_column.datatype_extension,
|
||||
decimal_column.datatype_extension.clone(),
|
||||
)
|
||||
.unwrap()
|
||||
)
|
||||
@@ -520,6 +520,7 @@ mod tests {
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.datatype_extension
|
||||
.clone()
|
||||
)
|
||||
.unwrap()
|
||||
)
|
||||
|
||||
@@ -479,7 +479,7 @@ impl Pool {
|
||||
})
|
||||
}
|
||||
|
||||
fn entry(&self, addr: String) -> Entry<String, Channel> {
|
||||
fn entry(&self, addr: String) -> Entry<'_, String, Channel> {
|
||||
self.channels.entry(addr)
|
||||
}
|
||||
|
||||
|
||||
@@ -325,7 +325,7 @@ fn build_struct(
|
||||
let result = #fn_name(handler, query_ctx, &[]).await
|
||||
.map_err(|e| datafusion_common::DataFusionError::Execution(format!("Function execution error: {}", e.output_msg())))?;
|
||||
|
||||
builder.push_value_ref(result.as_value_ref());
|
||||
builder.push_value_ref(&result.as_value_ref());
|
||||
} else {
|
||||
for i in 0..rows_num {
|
||||
let args: Vec<_> = columns.iter()
|
||||
@@ -335,7 +335,7 @@ fn build_struct(
|
||||
let result = #fn_name(handler, query_ctx, &args).await
|
||||
.map_err(|e| datafusion_common::DataFusionError::Execution(format!("Function execution error: {}", e.output_msg())))?;
|
||||
|
||||
builder.push_value_ref(result.as_value_ref());
|
||||
builder.push_value_ref(&result.as_value_ref());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -90,6 +90,24 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result<TokenStream2> {
|
||||
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: #dim })) })
|
||||
}
|
||||
}
|
||||
Some(TypeExt::ListType(ext)) => {
|
||||
let item_type = syn::Ident::new(&ext.datatype.to_string(), ident.span());
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::ListType(ListTypeExtension { item_type: #item_type })) })
|
||||
}
|
||||
}
|
||||
Some(TypeExt::StructType(ext)) => {
|
||||
let fields = ext.fields.iter().map(|field| {
|
||||
let field_name = syn::Ident::new(&field.name.clone(), ident.span());
|
||||
let field_type = syn::Ident::new(&field.datatype.to_string(), ident.span());
|
||||
quote! {
|
||||
StructField { name: #field_name, type_: #field_type }
|
||||
}
|
||||
}).collect::<Vec<_>>();
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::StructType(StructTypeExtension { fields: [#(#fields),*] })) })
|
||||
}
|
||||
}
|
||||
None => {
|
||||
quote! { None }
|
||||
}
|
||||
|
||||
@@ -184,7 +184,7 @@ pub(crate) fn convert_semantic_type_to_proto_semantic_type(
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ColumnDataTypeWithExtension {
|
||||
pub(crate) data_type: ColumnDataType,
|
||||
pub(crate) extension: Option<ColumnDataTypeExtension>,
|
||||
@@ -260,7 +260,10 @@ pub(crate) fn get_column_data_type(
|
||||
infer_column_data_type: &Option<ColumnDataTypeWithExtension>,
|
||||
attribute: &ColumnAttribute,
|
||||
) -> Option<ColumnDataTypeWithExtension> {
|
||||
attribute.datatype.or(*infer_column_data_type)
|
||||
attribute
|
||||
.datatype
|
||||
.clone()
|
||||
.or_else(|| infer_column_data_type.clone())
|
||||
}
|
||||
|
||||
/// Convert a column data type to a value data ident.
|
||||
@@ -304,5 +307,7 @@ pub(crate) fn convert_column_data_type_to_value_data_ident(
|
||||
// Json is a special case, it is actually a string column.
|
||||
ColumnDataType::Json => format_ident!("StringValue"),
|
||||
ColumnDataType::Vector => format_ident!("VectorValue"),
|
||||
ColumnDataType::List => format_ident!("ListValue"),
|
||||
ColumnDataType::Struct => format_ident!("StructValue"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,22 +29,17 @@ use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::heartbeat::utils::get_datanode_workloads;
|
||||
|
||||
pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease";
|
||||
const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
|
||||
|
||||
const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
|
||||
|
||||
pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
|
||||
|
||||
lazy_static! {
|
||||
pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
|
||||
Regex::new(&format!("^{DATANODE_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
|
||||
Regex::new("^__meta_datanode_lease-([0-9]+)-([0-9]+)$").unwrap();
|
||||
static ref DATANODE_STAT_KEY_PATTERN: Regex =
|
||||
Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
|
||||
static ref INACTIVE_REGION_KEY_PATTERN: Regex = Regex::new(&format!(
|
||||
"^{INACTIVE_REGION_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
|
||||
))
|
||||
.unwrap();
|
||||
static ref INACTIVE_REGION_KEY_PATTERN: Regex =
|
||||
Regex::new("^__meta_inactive_region-([0-9]+)-([0-9]+)-([0-9]+)$").unwrap();
|
||||
}
|
||||
|
||||
/// The key of the datanode stat in the storage.
|
||||
@@ -297,7 +292,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
|
||||
rcus: value.rcus,
|
||||
wcus: value.wcus,
|
||||
approximate_bytes: value.approximate_bytes as u64,
|
||||
engine: value.engine.to_string(),
|
||||
engine: value.engine.clone(),
|
||||
role: RegionRole::from(value.role()),
|
||||
num_rows: region_stat.num_rows,
|
||||
memtable_size: region_stat.memtable_size,
|
||||
|
||||
@@ -85,7 +85,7 @@ impl<'a> AlterLogicalTableValidator<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn table_names(&self) -> Vec<TableReference> {
|
||||
fn table_names(&self) -> Vec<TableReference<'_>> {
|
||||
self.alters
|
||||
.iter()
|
||||
.map(|alter| {
|
||||
|
||||
@@ -75,7 +75,7 @@ fn build_executor_from_alter_expr(alter_data: &AlterTableData) -> AlterTableExec
|
||||
let table_id = alter_data.table_id;
|
||||
let alter_kind = alter_data.task.alter_table.kind.as_ref().unwrap();
|
||||
let new_table_name = if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
|
||||
Some(new_table_name.to_string())
|
||||
Some(new_table_name.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -418,7 +418,7 @@ impl AlterTableData {
|
||||
}
|
||||
}
|
||||
|
||||
fn table_ref(&self) -> TableReference {
|
||||
fn table_ref(&self) -> TableReference<'_> {
|
||||
self.task.table_ref()
|
||||
}
|
||||
|
||||
|
||||
@@ -132,7 +132,7 @@ impl AlterTableExecutor {
|
||||
);
|
||||
|
||||
table_metadata_manager
|
||||
.rename_table(current_table_info_value, new_table_name.to_string())
|
||||
.rename_table(current_table_info_value, new_table_name.clone())
|
||||
.await?;
|
||||
} else {
|
||||
debug!(
|
||||
@@ -293,7 +293,7 @@ fn build_new_table_info(
|
||||
new_info.meta.next_column_id += columns.len() as u32;
|
||||
}
|
||||
AlterKind::RenameTable { new_table_name } => {
|
||||
new_info.name = new_table_name.to_string();
|
||||
new_info.name = new_table_name.clone();
|
||||
}
|
||||
AlterKind::DropColumns { .. }
|
||||
| AlterKind::ModifyColumnTypes { .. }
|
||||
|
||||
@@ -379,9 +379,10 @@ pub enum CreateFlowState {
|
||||
}
|
||||
|
||||
/// The type of flow.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
|
||||
pub enum FlowType {
|
||||
/// The flow is a batching task.
|
||||
#[default]
|
||||
Batching,
|
||||
/// The flow is a streaming task.
|
||||
Streaming,
|
||||
@@ -393,12 +394,6 @@ impl FlowType {
|
||||
pub const FLOW_TYPE_KEY: &str = "flow_type";
|
||||
}
|
||||
|
||||
impl Default for FlowType {
|
||||
fn default() -> Self {
|
||||
Self::Batching
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for FlowType {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
|
||||
@@ -52,7 +52,7 @@ impl CreateLogicalTablesProcedure {
|
||||
ensure!(
|
||||
task.create_table.create_if_not_exists,
|
||||
TableAlreadyExistsSnafu {
|
||||
table_name: task.create_table.table_name.to_string(),
|
||||
table_name: task.create_table.table_name.clone(),
|
||||
}
|
||||
);
|
||||
continue;
|
||||
|
||||
@@ -90,7 +90,7 @@ pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<Crea
|
||||
default_constraint: c.default_constraint.clone(),
|
||||
semantic_type: semantic_type as i32,
|
||||
comment: String::new(),
|
||||
datatype_extension: c.datatype_extension,
|
||||
datatype_extension: c.datatype_extension.clone(),
|
||||
options: c.options.clone(),
|
||||
}),
|
||||
column_id: i as u32,
|
||||
@@ -119,7 +119,7 @@ pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<Crea
|
||||
|
||||
let template = CreateRequest {
|
||||
region_id: 0,
|
||||
engine: create_table_expr.engine.to_string(),
|
||||
engine: create_table_expr.engine.clone(),
|
||||
column_defs,
|
||||
primary_key,
|
||||
path: String::new(),
|
||||
|
||||
@@ -167,8 +167,8 @@ impl DropFlowProcedure {
|
||||
&[
|
||||
CacheIdent::FlowId(flow_id),
|
||||
CacheIdent::FlowName(FlowName {
|
||||
catalog_name: flow_info_value.catalog_name.to_string(),
|
||||
flow_name: flow_info_value.flow_name.to_string(),
|
||||
catalog_name: flow_info_value.catalog_name.clone(),
|
||||
flow_name: flow_info_value.flow_name.clone(),
|
||||
}),
|
||||
CacheIdent::DropFlow(DropFlow {
|
||||
flow_id,
|
||||
|
||||
@@ -291,7 +291,7 @@ impl DropTableData {
|
||||
}
|
||||
}
|
||||
|
||||
fn table_ref(&self) -> TableReference {
|
||||
fn table_ref(&self) -> TableReference<'_> {
|
||||
self.task.table_ref()
|
||||
}
|
||||
|
||||
|
||||
@@ -219,7 +219,7 @@ pub(crate) struct DropViewData {
|
||||
}
|
||||
|
||||
impl DropViewData {
|
||||
fn table_ref(&self) -> TableReference {
|
||||
fn table_ref(&self) -> TableReference<'_> {
|
||||
self.task.table_ref()
|
||||
}
|
||||
|
||||
|
||||
@@ -247,7 +247,7 @@ pub fn assert_column_name(table_info: &RawTableInfo, expected_column_names: &[&s
|
||||
.schema
|
||||
.column_schemas
|
||||
.iter()
|
||||
.map(|c| c.name.to_string())
|
||||
.map(|c| c.name.clone())
|
||||
.collect::<Vec<_>>(),
|
||||
expected_column_names
|
||||
);
|
||||
|
||||
@@ -98,10 +98,10 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
|
||||
.unwrap_or(0),
|
||||
version: 1,
|
||||
},
|
||||
name: expr.table_name.to_string(),
|
||||
desc: Some(expr.desc.to_string()),
|
||||
catalog_name: expr.catalog_name.to_string(),
|
||||
schema_name: expr.schema_name.to_string(),
|
||||
name: expr.table_name.clone(),
|
||||
desc: Some(expr.desc.clone()),
|
||||
catalog_name: expr.catalog_name.clone(),
|
||||
schema_name: expr.schema_name.clone(),
|
||||
meta: RawTableMeta {
|
||||
schema: RawSchema {
|
||||
column_schemas: expr
|
||||
@@ -126,7 +126,7 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
|
||||
})
|
||||
.collect(),
|
||||
value_indices: vec![],
|
||||
engine: expr.engine.to_string(),
|
||||
engine: expr.engine.clone(),
|
||||
next_column_id: expr.column_defs.len() as u32,
|
||||
region_numbers: vec![],
|
||||
options: TableOptions::try_from_iter(&expr.table_options).unwrap(),
|
||||
|
||||
@@ -218,7 +218,7 @@ pub struct PartialSuccessDatanodeHandler {
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
|
||||
async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
|
||||
let success = peer.id % 2 == 0;
|
||||
let success = peer.id.is_multiple_of(2);
|
||||
if success {
|
||||
Ok(RegionResponse::new(0))
|
||||
} else if self.retryable {
|
||||
|
||||
@@ -218,7 +218,7 @@ impl TruncateTableData {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn table_ref(&self) -> TableReference {
|
||||
pub fn table_ref(&self) -> TableReference<'_> {
|
||||
self.task.table_ref()
|
||||
}
|
||||
|
||||
|
||||
@@ -302,35 +302,25 @@ pub struct DropFlow {
|
||||
}
|
||||
|
||||
/// Strategy for executing flush operations.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
|
||||
pub enum FlushStrategy {
|
||||
/// Synchronous operation that waits for completion and expects a reply
|
||||
#[default]
|
||||
Sync,
|
||||
/// Asynchronous hint operation (fire-and-forget, no reply expected)
|
||||
Async,
|
||||
}
|
||||
|
||||
/// Error handling strategy for batch flush operations.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
|
||||
pub enum FlushErrorStrategy {
|
||||
/// Abort on first error (fail-fast)
|
||||
#[default]
|
||||
FailFast,
|
||||
/// Attempt to flush all regions and collect all errors
|
||||
TryAll,
|
||||
}
|
||||
|
||||
impl Default for FlushStrategy {
|
||||
fn default() -> Self {
|
||||
Self::Sync
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for FlushErrorStrategy {
|
||||
fn default() -> Self {
|
||||
Self::FailFast
|
||||
}
|
||||
}
|
||||
|
||||
/// Unified flush instruction supporting both single and batch operations
|
||||
/// with configurable execution strategies and error handling.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
|
||||
@@ -2172,7 +2172,7 @@ mod tests {
|
||||
table_id,
|
||||
RegionInfo {
|
||||
engine: engine.to_string(),
|
||||
region_storage_path: region_storage_path.to_string(),
|
||||
region_storage_path: region_storage_path.clone(),
|
||||
region_options: HashMap::new(),
|
||||
region_wal_options: HashMap::new(),
|
||||
},
|
||||
@@ -2191,7 +2191,7 @@ mod tests {
|
||||
table_id,
|
||||
RegionInfo {
|
||||
engine: engine.to_string(),
|
||||
region_storage_path: region_storage_path.to_string(),
|
||||
region_storage_path: region_storage_path.clone(),
|
||||
region_options: HashMap::new(),
|
||||
region_wal_options: HashMap::new(),
|
||||
},
|
||||
@@ -2216,7 +2216,7 @@ mod tests {
|
||||
table_id,
|
||||
RegionInfo {
|
||||
engine: engine.to_string(),
|
||||
region_storage_path: region_storage_path.to_string(),
|
||||
region_storage_path: region_storage_path.clone(),
|
||||
region_options: HashMap::new(),
|
||||
region_wal_options: HashMap::new(),
|
||||
},
|
||||
@@ -2247,7 +2247,7 @@ mod tests {
|
||||
table_id,
|
||||
RegionInfo {
|
||||
engine: engine.to_string(),
|
||||
region_storage_path: region_storage_path.to_string(),
|
||||
region_storage_path: region_storage_path.clone(),
|
||||
region_options: HashMap::new(),
|
||||
region_wal_options: HashMap::new(),
|
||||
},
|
||||
|
||||
@@ -113,7 +113,7 @@ impl TableInfoValue {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn table_ref(&self) -> TableReference {
|
||||
pub fn table_ref(&self) -> TableReference<'_> {
|
||||
TableReference::full(
|
||||
&self.table_info.catalog_name,
|
||||
&self.table_info.schema_name,
|
||||
@@ -123,9 +123,9 @@ impl TableInfoValue {
|
||||
|
||||
pub fn table_name(&self) -> TableName {
|
||||
TableName {
|
||||
catalog_name: self.table_info.catalog_name.to_string(),
|
||||
schema_name: self.table_info.schema_name.to_string(),
|
||||
table_name: self.table_info.name.to_string(),
|
||||
catalog_name: self.table_info.catalog_name.clone(),
|
||||
schema_name: self.table_info.schema_name.clone(),
|
||||
table_name: self.table_info.name.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -555,8 +555,8 @@ impl MySqlStore {
|
||||
sqlx::query(&sql_template_set.create_table_statement)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.context(MySqlExecutionSnafu {
|
||||
sql: sql_template_set.create_table_statement.to_string(),
|
||||
.with_context(|_| MySqlExecutionSnafu {
|
||||
sql: sql_template_set.create_table_statement.clone(),
|
||||
})?;
|
||||
Ok(Arc::new(MySqlStore {
|
||||
max_txn_ops,
|
||||
|
||||
@@ -880,7 +880,7 @@ impl PgStore {
|
||||
.execute(&sql_template_set.create_table_statement, &[])
|
||||
.await
|
||||
.with_context(|_| PostgresExecutionSnafu {
|
||||
sql: sql_template_set.create_table_statement.to_string(),
|
||||
sql: sql_template_set.create_table_statement.clone(),
|
||||
})?;
|
||||
Ok(Arc::new(Self {
|
||||
max_txn_ops,
|
||||
@@ -926,8 +926,8 @@ mod tests {
|
||||
client
|
||||
.execute(&sql_templates.create_table_statement, &[])
|
||||
.await
|
||||
.context(PostgresExecutionSnafu {
|
||||
sql: sql_templates.create_table_statement.to_string(),
|
||||
.with_context(|_| PostgresExecutionSnafu {
|
||||
sql: sql_templates.create_table_statement.clone(),
|
||||
})
|
||||
.unwrap();
|
||||
Some(PgStore {
|
||||
|
||||
@@ -13,8 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(assert_matches)]
|
||||
#![feature(btree_extract_if)]
|
||||
#![feature(let_chains)]
|
||||
#![feature(duration_millis_float)]
|
||||
|
||||
pub mod cache;
|
||||
|
||||
@@ -71,7 +71,7 @@ impl State for ReconcileRegions {
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.filter(|c| c.semantic_type == SemanticType::Tag)
|
||||
.map(|c| c.column_schema.name.to_string())
|
||||
.map(|c| c.column_schema.name.clone())
|
||||
.collect::<HashSet<_>>();
|
||||
let column_defs = self
|
||||
.column_metadatas
|
||||
|
||||
@@ -383,7 +383,7 @@ pub struct CreateViewTask {
|
||||
|
||||
impl CreateViewTask {
|
||||
/// Returns the [`TableReference`] of view.
|
||||
pub fn table_ref(&self) -> TableReference {
|
||||
pub fn table_ref(&self) -> TableReference<'_> {
|
||||
TableReference {
|
||||
catalog: &self.create_view.catalog_name,
|
||||
schema: &self.create_view.schema_name,
|
||||
@@ -496,7 +496,7 @@ pub struct DropViewTask {
|
||||
|
||||
impl DropViewTask {
|
||||
/// Returns the [`TableReference`] of view.
|
||||
pub fn table_ref(&self) -> TableReference {
|
||||
pub fn table_ref(&self) -> TableReference<'_> {
|
||||
TableReference {
|
||||
catalog: &self.catalog,
|
||||
schema: &self.schema,
|
||||
@@ -553,7 +553,7 @@ pub struct DropTableTask {
|
||||
}
|
||||
|
||||
impl DropTableTask {
|
||||
pub fn table_ref(&self) -> TableReference {
|
||||
pub fn table_ref(&self) -> TableReference<'_> {
|
||||
TableReference {
|
||||
catalog: &self.catalog,
|
||||
schema: &self.schema,
|
||||
@@ -563,9 +563,9 @@ impl DropTableTask {
|
||||
|
||||
pub fn table_name(&self) -> TableName {
|
||||
TableName {
|
||||
catalog_name: self.catalog.to_string(),
|
||||
schema_name: self.schema.to_string(),
|
||||
table_name: self.table.to_string(),
|
||||
catalog_name: self.catalog.clone(),
|
||||
schema_name: self.schema.clone(),
|
||||
table_name: self.table.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -659,13 +659,13 @@ impl CreateTableTask {
|
||||
let table = &self.create_table;
|
||||
|
||||
TableName {
|
||||
catalog_name: table.catalog_name.to_string(),
|
||||
schema_name: table.schema_name.to_string(),
|
||||
table_name: table.table_name.to_string(),
|
||||
catalog_name: table.catalog_name.clone(),
|
||||
schema_name: table.schema_name.clone(),
|
||||
table_name: table.table_name.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn table_ref(&self) -> TableReference {
|
||||
pub fn table_ref(&self) -> TableReference<'_> {
|
||||
let table = &self.create_table;
|
||||
|
||||
TableReference {
|
||||
@@ -750,7 +750,7 @@ impl AlterTableTask {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn table_ref(&self) -> TableReference {
|
||||
pub fn table_ref(&self) -> TableReference<'_> {
|
||||
TableReference {
|
||||
catalog: &self.alter_table.catalog_name,
|
||||
schema: &self.alter_table.schema_name,
|
||||
@@ -762,9 +762,9 @@ impl AlterTableTask {
|
||||
let table = &self.alter_table;
|
||||
|
||||
TableName {
|
||||
catalog_name: table.catalog_name.to_string(),
|
||||
schema_name: table.schema_name.to_string(),
|
||||
table_name: table.table_name.to_string(),
|
||||
catalog_name: table.catalog_name.clone(),
|
||||
schema_name: table.schema_name.clone(),
|
||||
table_name: table.table_name.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -834,7 +834,7 @@ pub struct TruncateTableTask {
|
||||
}
|
||||
|
||||
impl TruncateTableTask {
|
||||
pub fn table_ref(&self) -> TableReference {
|
||||
pub fn table_ref(&self) -> TableReference<'_> {
|
||||
TableReference {
|
||||
catalog: &self.catalog,
|
||||
schema: &self.schema,
|
||||
@@ -844,9 +844,9 @@ impl TruncateTableTask {
|
||||
|
||||
pub fn table_name(&self) -> TableName {
|
||||
TableName {
|
||||
catalog_name: self.catalog.to_string(),
|
||||
schema_name: self.schema.to_string(),
|
||||
table_name: self.table.to_string(),
|
||||
catalog_name: self.catalog.clone(),
|
||||
schema_name: self.schema.clone(),
|
||||
table_name: self.table.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1344,7 +1344,7 @@ impl From<QueryContextRef> for QueryContext {
|
||||
fn from(query_context: QueryContextRef) -> Self {
|
||||
QueryContext {
|
||||
current_catalog: query_context.current_catalog().to_string(),
|
||||
current_schema: query_context.current_schema().to_string(),
|
||||
current_schema: query_context.current_schema().clone(),
|
||||
timezone: query_context.timezone().to_string(),
|
||||
extensions: query_context.extensions(),
|
||||
channel: query_context.channel() as u8,
|
||||
@@ -1401,7 +1401,7 @@ impl From<QueryContextRef> for FlowQueryContext {
|
||||
fn from(ctx: QueryContextRef) -> Self {
|
||||
Self {
|
||||
catalog: ctx.current_catalog().to_string(),
|
||||
schema: ctx.current_schema().to_string(),
|
||||
schema: ctx.current_schema().clone(),
|
||||
timezone: ctx.timezone().to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,7 +128,7 @@ pub fn procedure_details_to_pb_response(metas: Vec<ProcedureInfo>) -> PbProcedur
|
||||
let (status, error) = procedure_state_to_pb_state(&meta.state);
|
||||
PbProcedureMeta {
|
||||
id: Some(pid_to_pb_pid(meta.id)),
|
||||
type_name: meta.type_name.to_string(),
|
||||
type_name: meta.type_name.clone(),
|
||||
status: status.into(),
|
||||
start_time_ms: meta.start_time_ms,
|
||||
end_time_ms: meta.end_time_ms,
|
||||
|
||||
@@ -121,7 +121,7 @@ impl StateStore for KvStateStore {
|
||||
self.kv_backend
|
||||
.put(
|
||||
PutRequest::new()
|
||||
.with_key(key.to_string().into_bytes())
|
||||
.with_key(key.clone().into_bytes())
|
||||
.with_value(value),
|
||||
)
|
||||
.await
|
||||
@@ -145,7 +145,7 @@ impl StateStore for KvStateStore {
|
||||
let key = if idx > 0 {
|
||||
KeySet::with_segment_suffix(&key, idx)
|
||||
} else {
|
||||
key.to_string()
|
||||
key.clone()
|
||||
};
|
||||
let kv_backend = self.kv_backend.clone();
|
||||
async move {
|
||||
|
||||
@@ -169,7 +169,7 @@ impl ActiveBucket {
|
||||
}
|
||||
} else {
|
||||
datanode_stats.insert(
|
||||
stat.topic.to_string(),
|
||||
stat.topic.clone(),
|
||||
PartialTopicStat {
|
||||
latest_entry_id: stat.latest_entry_id,
|
||||
record_size: stat.record_size,
|
||||
@@ -205,7 +205,7 @@ impl ActiveBucket {
|
||||
let record_num = stats.iter().map(|stat| stat.record_num).sum::<u64>();
|
||||
|
||||
output.insert(
|
||||
topic.to_string(),
|
||||
topic.clone(),
|
||||
HistoryTopicStat {
|
||||
latest_entry_id,
|
||||
record_size,
|
||||
|
||||
@@ -104,8 +104,8 @@ impl KafkaTopicCreator {
|
||||
let end_offset = partition_client
|
||||
.get_offset(OffsetAt::Latest)
|
||||
.await
|
||||
.context(KafkaGetOffsetSnafu {
|
||||
topic: topic.to_string(),
|
||||
.with_context(|_| KafkaGetOffsetSnafu {
|
||||
topic: topic.clone(),
|
||||
partition: DEFAULT_PARTITION,
|
||||
})?;
|
||||
if end_offset > 0 {
|
||||
@@ -284,9 +284,11 @@ mod tests {
|
||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||
|
||||
let topic = format!("{}{}", prefix, "0");
|
||||
let topics = std::slice::from_ref(&topic);
|
||||
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
||||
creator.delete_topics(topics).await.unwrap();
|
||||
creator.create_topics(topics).await.unwrap();
|
||||
|
||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
||||
@@ -309,10 +311,12 @@ mod tests {
|
||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||
|
||||
let topic = format!("{}{}", prefix, "0");
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
||||
let topics = std::slice::from_ref(&topic);
|
||||
|
||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(topics).await.unwrap();
|
||||
|
||||
creator.create_topics(topics).await.unwrap();
|
||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||
append_records(&partition_client, 2).await.unwrap();
|
||||
|
||||
@@ -336,12 +340,14 @@ mod tests {
|
||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||
|
||||
let topic = format!("{}{}", prefix, "0");
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
||||
let topics = std::slice::from_ref(&topic);
|
||||
|
||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(topics).await.unwrap();
|
||||
|
||||
creator.create_topics(topics).await.unwrap();
|
||||
// Should be ok
|
||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
||||
creator.create_topics(topics).await.unwrap();
|
||||
|
||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
||||
@@ -356,10 +362,12 @@ mod tests {
|
||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||
|
||||
let topic = format!("{}{}", prefix, "0");
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
||||
let topics = std::slice::from_ref(&topic);
|
||||
|
||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(topics).await.unwrap();
|
||||
|
||||
creator.create_topics(topics).await.unwrap();
|
||||
creator.prepare_topic(&topic).await.unwrap();
|
||||
|
||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||
@@ -382,10 +390,12 @@ mod tests {
|
||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||
|
||||
let topic = format!("{}{}", prefix, "0");
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
||||
let topics = std::slice::from_ref(&topic);
|
||||
|
||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
||||
// Clean up the topics before test
|
||||
creator.delete_topics(topics).await.unwrap();
|
||||
|
||||
creator.create_topics(topics).await.unwrap();
|
||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||
append_records(&partition_client, 10).await.unwrap();
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ impl KafkaTopicManager {
|
||||
let mut topics_to_create = Vec::with_capacity(all_topics.len());
|
||||
for topic in all_topics {
|
||||
if !existing_topic_set.contains(topic) {
|
||||
topics_to_create.push(topic.to_string());
|
||||
topics_to_create.push(topic.clone());
|
||||
}
|
||||
}
|
||||
Ok(topics_to_create)
|
||||
|
||||
@@ -195,18 +195,18 @@ pub async fn acquire_dynamic_key_lock(
|
||||
) -> DynamicKeyLockGuard {
|
||||
match key {
|
||||
StringKey::Share(key) => {
|
||||
let guard = lock.read(key.to_string()).await;
|
||||
let guard = lock.read(key.clone()).await;
|
||||
DynamicKeyLockGuard {
|
||||
guard: Some(OwnedKeyRwLockGuard::from(guard)),
|
||||
key: key.to_string(),
|
||||
key: key.clone(),
|
||||
lock: lock.clone(),
|
||||
}
|
||||
}
|
||||
StringKey::Exclusive(key) => {
|
||||
let guard = lock.write(key.to_string()).await;
|
||||
let guard = lock.write(key.clone()).await;
|
||||
DynamicKeyLockGuard {
|
||||
guard: Some(OwnedKeyRwLockGuard::from(guard)),
|
||||
key: key.to_string(),
|
||||
key: key.clone(),
|
||||
lock: lock.clone(),
|
||||
}
|
||||
}
|
||||
@@ -227,7 +227,7 @@ impl Drop for DynamicKeyLockGuard {
|
||||
if let Some(guard) = self.guard.take() {
|
||||
drop(guard);
|
||||
}
|
||||
self.lock.clean_keys(&[self.key.to_string()]);
|
||||
self.lock.clean_keys(std::slice::from_ref(&self.key));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1616,7 +1616,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
// If the procedure is poisoned, the poison key shouldn't be deleted.
|
||||
assert_eq!(&procedure_id.to_string(), ROOT_ID);
|
||||
assert_eq!(&procedure_id.clone(), ROOT_ID);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -37,6 +37,7 @@ macro_rules! proc_path {
|
||||
($store: expr, $fmt:expr, $($args:tt)*) => { format!("{}{}", $store.proc_path(), format_args!($fmt, $($args)*)) };
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) use proc_path;
|
||||
|
||||
/// Serialized data of a procedure.
|
||||
@@ -579,13 +580,7 @@ mod tests {
|
||||
let type_name = procedure.type_name().to_string();
|
||||
let data = procedure.dump().unwrap();
|
||||
store
|
||||
.store_procedure(
|
||||
procedure_id,
|
||||
0,
|
||||
type_name.to_string(),
|
||||
data.to_string(),
|
||||
None,
|
||||
)
|
||||
.store_procedure(procedure_id, 0, type_name.clone(), data.clone(), None)
|
||||
.await
|
||||
.unwrap();
|
||||
let message = ProcedureMessage {
|
||||
|
||||
@@ -68,7 +68,7 @@ impl KeySet {
|
||||
|
||||
pub fn keys(&self) -> Vec<String> {
|
||||
let mut keys = Vec::with_capacity(self.segments + 1);
|
||||
keys.push(self.key.to_string());
|
||||
keys.push(self.key.clone());
|
||||
for i in 1..=self.segments {
|
||||
keys.push(Self::with_segment_suffix(&self.key, i))
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ impl PoisonStore for InMemoryPoisonStore {
|
||||
let mut map = self.map.write().unwrap();
|
||||
match map.entry(key) {
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(token.to_string());
|
||||
v.insert(token.clone());
|
||||
}
|
||||
Entry::Occupied(o) => {
|
||||
let value = o.get();
|
||||
|
||||
@@ -145,7 +145,7 @@ impl From<&AddColumnLocation> for Location {
|
||||
},
|
||||
AddColumnLocation::After { column_name } => Location {
|
||||
location_type: LocationType::After.into(),
|
||||
after_column_name: column_name.to_string(),
|
||||
after_column_name: column_name.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,7 +137,7 @@ pub fn map_json_type_to_string_schema(schema: SchemaRef) -> (SchemaRef, bool) {
|
||||
for column in schema.column_schemas() {
|
||||
if matches!(column.data_type, ConcreteDataType::Json(_)) {
|
||||
new_columns.push(ColumnSchema::new(
|
||||
column.name.to_string(),
|
||||
column.name.clone(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
column.is_nullable(),
|
||||
));
|
||||
|
||||
@@ -220,7 +220,7 @@ pub fn sql_value_to_value(
|
||||
_ => return InvalidUnaryOpSnafu { unary_op, value }.fail(),
|
||||
},
|
||||
|
||||
Value::String(_) | Value::Binary(_) | Value::List(_) => {
|
||||
Value::String(_) | Value::Binary(_) | Value::List(_) | Value::Struct(_) => {
|
||||
return InvalidUnaryOpSnafu { unary_op, value }.fail();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,8 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(let_chains)]
|
||||
|
||||
mod df_substrait;
|
||||
pub mod error;
|
||||
pub mod extension_serializer;
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(let_chains)]
|
||||
#![feature(duration_constructors)]
|
||||
|
||||
pub mod logging;
|
||||
|
||||
@@ -454,7 +454,7 @@ fn build_otlp_exporter(opts: &LoggingOptions) -> SpanExporter {
|
||||
.as_ref()
|
||||
.map(|e| {
|
||||
if e.starts_with("http") {
|
||||
e.to_string()
|
||||
e.clone()
|
||||
} else {
|
||||
format!("http://{}", e)
|
||||
}
|
||||
|
||||
@@ -308,12 +308,12 @@ impl Timestamp {
|
||||
fn from_splits(sec: i64, nsec: u32) -> Option<Self> {
|
||||
if nsec == 0 {
|
||||
Some(Timestamp::new_second(sec))
|
||||
} else if nsec % 1_000_000 == 0 {
|
||||
} else if nsec.is_multiple_of(1_000_000) {
|
||||
let millis = nsec / 1_000_000;
|
||||
sec.checked_mul(1000)
|
||||
.and_then(|v| v.checked_add(millis as i64))
|
||||
.map(Timestamp::new_millisecond)
|
||||
} else if nsec % 1000 == 0 {
|
||||
} else if nsec.is_multiple_of(1_000) {
|
||||
let micros = nsec / 1000;
|
||||
sec.checked_mul(1_000_000)
|
||||
.and_then(|v| v.checked_add(micros as i64))
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(assert_matches)]
|
||||
#![feature(duration_constructors_lite)]
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(assert_matches)]
|
||||
#![feature(let_chains)]
|
||||
|
||||
pub mod alive_keeper;
|
||||
pub mod config;
|
||||
|
||||
@@ -706,7 +706,7 @@ impl RegionServerParallelism {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn acquire(&self) -> Result<SemaphorePermit> {
|
||||
pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
|
||||
timeout(self.timeout, self.semaphore.acquire())
|
||||
.await
|
||||
.context(ConcurrentQueryLimiterTimeoutSnafu)?
|
||||
@@ -919,8 +919,7 @@ impl RegionServerInner {
|
||||
if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
|
||||
requests.push((region_id, request));
|
||||
} else {
|
||||
engine_grouped_requests
|
||||
.insert(request.engine.to_string(), vec![(region_id, request)]);
|
||||
engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -285,6 +285,13 @@ impl ConcreteDataType {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_struct(&self) -> Option<&StructType> {
|
||||
match self {
|
||||
ConcreteDataType::Struct(s) => Some(s),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to cast data type as a [`TimestampType`].
|
||||
pub fn as_timestamp(&self) -> Option<TimestampType> {
|
||||
match self {
|
||||
|
||||
@@ -220,6 +220,14 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert scalar value to Arrow array"))]
|
||||
ConvertScalarToArrowArray {
|
||||
#[snafu(source)]
|
||||
error: datafusion_common::DataFusionError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse extended type in metadata: {}", value))]
|
||||
ParseExtendedType {
|
||||
value: String,
|
||||
@@ -238,6 +246,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Inconsistent struct field count {field_len} and item count {item_len}"))]
|
||||
InconsistentStructFieldsAndItems {
|
||||
field_len: usize,
|
||||
item_len: usize,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Failed to process JSONB value"))]
|
||||
InvalidJsonb {
|
||||
error: jsonb::Error,
|
||||
@@ -282,7 +297,9 @@ impl ErrorExt for Error {
|
||||
| ToScalarValue { .. }
|
||||
| TryFromValue { .. }
|
||||
| ConvertArrowArrayToScalars { .. }
|
||||
| ParseExtendedType { .. } => StatusCode::Internal,
|
||||
| ConvertScalarToArrowArray { .. }
|
||||
| ParseExtendedType { .. }
|
||||
| InconsistentStructFieldsAndItems { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(let_chains)]
|
||||
#![feature(assert_matches)]
|
||||
|
||||
pub mod arrow_array;
|
||||
|
||||
@@ -21,10 +21,10 @@ use crate::types::{
|
||||
Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type, UInt16Type,
|
||||
UInt32Type, UInt64Type,
|
||||
};
|
||||
use crate::value::{ListValue, ListValueRef, Value};
|
||||
use crate::value::{ListValue, ListValueRef, StructValue, StructValueRef, Value};
|
||||
use crate::vectors::{
|
||||
BinaryVector, BooleanVector, DateVector, Decimal128Vector, ListVector, MutableVector,
|
||||
PrimitiveVector, StringVector, Vector,
|
||||
NullVector, PrimitiveVector, StringVector, StructVector, Vector,
|
||||
};
|
||||
|
||||
fn get_iter_capacity<T, I: Iterator<Item = T>>(iter: &I) -> usize {
|
||||
@@ -52,7 +52,7 @@ where
|
||||
fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short>;
|
||||
}
|
||||
|
||||
pub trait ScalarRef<'a>: std::fmt::Debug + Clone + Copy + Send + 'a {
|
||||
pub trait ScalarRef<'a>: std::fmt::Debug + Clone + Send + 'a {
|
||||
/// The corresponding [`Scalar`] type.
|
||||
type ScalarType: Scalar<RefType<'a> = Self>;
|
||||
|
||||
@@ -95,7 +95,7 @@ where
|
||||
fn from_slice(data: &[Self::RefItem<'_>]) -> Self {
|
||||
let mut builder = Self::Builder::with_capacity(data.len());
|
||||
for item in data {
|
||||
builder.push(Some(*item));
|
||||
builder.push(Some(item.clone()));
|
||||
}
|
||||
builder.finish()
|
||||
}
|
||||
@@ -152,13 +152,11 @@ macro_rules! impl_scalar_for_native {
|
||||
type VectorType = PrimitiveVector<$DataType>;
|
||||
type RefType<'a> = $Native;
|
||||
|
||||
#[inline]
|
||||
fn as_scalar_ref(&self) -> $Native {
|
||||
*self
|
||||
}
|
||||
|
||||
#[allow(clippy::needless_lifetimes)]
|
||||
#[inline]
|
||||
fn upcast_gat<'short, 'long: 'short>(long: $Native) -> $Native {
|
||||
long
|
||||
}
|
||||
@@ -168,7 +166,6 @@ macro_rules! impl_scalar_for_native {
|
||||
impl<'a> ScalarRef<'a> for $Native {
|
||||
type ScalarType = $Native;
|
||||
|
||||
#[inline]
|
||||
fn to_owned_scalar(&self) -> $Native {
|
||||
*self
|
||||
}
|
||||
@@ -187,17 +184,33 @@ impl_scalar_for_native!(i64, Int64Type);
|
||||
impl_scalar_for_native!(f32, Float32Type);
|
||||
impl_scalar_for_native!(f64, Float64Type);
|
||||
|
||||
impl Scalar for () {
|
||||
type VectorType = NullVector;
|
||||
type RefType<'a> = ();
|
||||
|
||||
fn as_scalar_ref(&self) {}
|
||||
|
||||
#[allow(clippy::needless_lifetimes)]
|
||||
fn upcast_gat<'short, 'long: 'short>(long: ()) {
|
||||
long
|
||||
}
|
||||
}
|
||||
|
||||
impl ScalarRef<'_> for () {
|
||||
type ScalarType = ();
|
||||
|
||||
fn to_owned_scalar(&self) {}
|
||||
}
|
||||
|
||||
impl Scalar for bool {
|
||||
type VectorType = BooleanVector;
|
||||
type RefType<'a> = bool;
|
||||
|
||||
#[inline]
|
||||
fn as_scalar_ref(&self) -> bool {
|
||||
*self
|
||||
}
|
||||
|
||||
#[allow(clippy::needless_lifetimes)]
|
||||
#[inline]
|
||||
fn upcast_gat<'short, 'long: 'short>(long: bool) -> bool {
|
||||
long
|
||||
}
|
||||
@@ -206,7 +219,6 @@ impl Scalar for bool {
|
||||
impl ScalarRef<'_> for bool {
|
||||
type ScalarType = bool;
|
||||
|
||||
#[inline]
|
||||
fn to_owned_scalar(&self) -> bool {
|
||||
*self
|
||||
}
|
||||
@@ -216,12 +228,10 @@ impl Scalar for String {
|
||||
type VectorType = StringVector;
|
||||
type RefType<'a> = &'a str;
|
||||
|
||||
#[inline]
|
||||
fn as_scalar_ref(&self) -> &str {
|
||||
self
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn upcast_gat<'short, 'long: 'short>(long: &'long str) -> &'short str {
|
||||
long
|
||||
}
|
||||
@@ -230,7 +240,6 @@ impl Scalar for String {
|
||||
impl<'a> ScalarRef<'a> for &'a str {
|
||||
type ScalarType = String;
|
||||
|
||||
#[inline]
|
||||
fn to_owned_scalar(&self) -> String {
|
||||
self.to_string()
|
||||
}
|
||||
@@ -240,12 +249,10 @@ impl Scalar for Vec<u8> {
|
||||
type VectorType = BinaryVector;
|
||||
type RefType<'a> = &'a [u8];
|
||||
|
||||
#[inline]
|
||||
fn as_scalar_ref(&self) -> &[u8] {
|
||||
self
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn upcast_gat<'short, 'long: 'short>(long: &'long [u8]) -> &'short [u8] {
|
||||
long
|
||||
}
|
||||
@@ -254,7 +261,6 @@ impl Scalar for Vec<u8> {
|
||||
impl<'a> ScalarRef<'a> for &'a [u8] {
|
||||
type ScalarType = Vec<u8>;
|
||||
|
||||
#[inline]
|
||||
fn to_owned_scalar(&self) -> Vec<u8> {
|
||||
self.to_vec()
|
||||
}
|
||||
@@ -332,6 +338,42 @@ impl<'a> ScalarRef<'a> for ListValueRef<'a> {
|
||||
_ => unreachable!(),
|
||||
},
|
||||
ListValueRef::Ref { val } => (*val).clone(),
|
||||
ListValueRef::RefList { val, item_datatype } => ListValue::new(
|
||||
val.iter().map(|v| Value::from(v.clone())).collect(),
|
||||
item_datatype.clone(),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Scalar for StructValue {
|
||||
type VectorType = StructVector;
|
||||
type RefType<'a> = StructValueRef<'a>;
|
||||
|
||||
fn as_scalar_ref(&self) -> Self::RefType<'_> {
|
||||
StructValueRef::Ref(self)
|
||||
}
|
||||
|
||||
fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short> {
|
||||
long
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ScalarRef<'a> for StructValueRef<'a> {
|
||||
type ScalarType = StructValue;
|
||||
|
||||
fn to_owned_scalar(&self) -> Self::ScalarType {
|
||||
match self {
|
||||
Self::Indexed { vector, idx } => match vector.get(*idx) {
|
||||
Value::Null => StructValue::default(),
|
||||
Value::Struct(v) => v,
|
||||
_ => unreachable!(),
|
||||
},
|
||||
StructValueRef::Ref(val) => (*val).clone(),
|
||||
StructValueRef::RefList { val, fields } => {
|
||||
let items = val.iter().map(|v| Value::from(v.clone())).collect();
|
||||
StructValue::try_new(items, fields.clone()).unwrap()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -346,7 +388,7 @@ mod tests {
|
||||
fn build_vector_from_slice<T: ScalarVector>(items: &[Option<T::RefItem<'_>>]) -> T {
|
||||
let mut builder = T::Builder::with_capacity(items.len());
|
||||
for item in items {
|
||||
builder.push(*item);
|
||||
builder.push(item.clone());
|
||||
}
|
||||
builder.finish()
|
||||
}
|
||||
|
||||
@@ -329,7 +329,7 @@ impl TryFrom<Arc<ArrowSchema>> for Schema {
|
||||
let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
|
||||
for field in &arrow_schema.fields {
|
||||
let column_schema = ColumnSchema::try_from(field.as_ref())?;
|
||||
let _ = name_to_index.insert(field.name().to_string(), column_schemas.len());
|
||||
let _ = name_to_index.insert(field.name().clone(), column_schemas.len());
|
||||
column_schemas.push(column_schema);
|
||||
}
|
||||
|
||||
|
||||
@@ -296,7 +296,7 @@ impl ColumnSchema {
|
||||
let value_ref = padding_value.as_value_ref();
|
||||
let mut mutable_vector = self.data_type.create_mutable_vector(num_rows);
|
||||
for _ in 0..num_rows {
|
||||
mutable_vector.push_value_ref(value_ref);
|
||||
mutable_vector.push_value_ref(&value_ref);
|
||||
}
|
||||
mutable_vector.to_vector()
|
||||
}
|
||||
|
||||
@@ -157,7 +157,7 @@ impl ColumnDefaultConstraint {
|
||||
// attempt to downcast the vector fail if they don't check whether the vector is const
|
||||
// first.
|
||||
let mut mutable_vector = data_type.create_mutable_vector(1);
|
||||
mutable_vector.try_push_value_ref(v.as_value_ref())?;
|
||||
mutable_vector.try_push_value_ref(&v.as_value_ref())?;
|
||||
let base_vector = mutable_vector.to_vector();
|
||||
Ok(base_vector.replicate(&[num_rows]))
|
||||
}
|
||||
|
||||
@@ -83,10 +83,10 @@ impl LogicalPrimitiveType for DateType {
|
||||
})
|
||||
}
|
||||
|
||||
fn cast_value_ref(value: ValueRef) -> Result<Option<Date>> {
|
||||
fn cast_value_ref(value: &ValueRef) -> Result<Option<Date>> {
|
||||
match value {
|
||||
ValueRef::Null => Ok(None),
|
||||
ValueRef::Date(v) => Ok(Some(v)),
|
||||
ValueRef::Date(v) => Ok(Some(*v)),
|
||||
other => error::CastTypeSnafu {
|
||||
msg: format!("Failed to cast value {other:?} to Date"),
|
||||
}
|
||||
|
||||
@@ -133,11 +133,11 @@ macro_rules! impl_data_type_for_duration {
|
||||
})
|
||||
}
|
||||
|
||||
fn cast_value_ref(value: ValueRef) -> crate::Result<Option<Self::Wrapper>> {
|
||||
fn cast_value_ref(value: &ValueRef) -> crate::Result<Option<Self::Wrapper>> {
|
||||
match value {
|
||||
ValueRef::Null => Ok(None),
|
||||
ValueRef::Duration(t) => match t.unit() {
|
||||
TimeUnit::$unit => Ok(Some([<Duration $unit>](t))),
|
||||
TimeUnit::$unit => Ok(Some([<Duration $unit>](*t))),
|
||||
other => error::CastTypeSnafu {
|
||||
msg: format!(
|
||||
"Failed to cast Duration value with different unit {:?} to {}",
|
||||
|
||||
@@ -123,10 +123,10 @@ macro_rules! impl_data_type_for_interval {
|
||||
})
|
||||
}
|
||||
|
||||
fn cast_value_ref(value: ValueRef) -> crate::Result<Option<Self::Wrapper>> {
|
||||
fn cast_value_ref(value: &ValueRef) -> crate::Result<Option<Self::Wrapper>> {
|
||||
match value {
|
||||
ValueRef::Null => Ok(None),
|
||||
ValueRef::[<Interval $unit>](t) => Ok(Some(t)),
|
||||
ValueRef::[<Interval $unit>](t) => Ok(Some(*t)),
|
||||
other => error::CastTypeSnafu {
|
||||
msg: format!("Failed to cast value {:?} to {}", other, stringify!([<Interval $unit>])),
|
||||
}
|
||||
|
||||
@@ -28,17 +28,14 @@ use crate::vectors::{BinaryVectorBuilder, MutableVector};
|
||||
|
||||
pub const JSON_TYPE_NAME: &str = "Json";
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
#[derive(
|
||||
Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default,
|
||||
)]
|
||||
pub enum JsonFormat {
|
||||
#[default]
|
||||
Jsonb,
|
||||
}
|
||||
|
||||
impl Default for JsonFormat {
|
||||
fn default() -> Self {
|
||||
Self::Jsonb
|
||||
}
|
||||
}
|
||||
|
||||
/// JsonType is a data type for JSON data. It is stored as binary data of jsonb format.
|
||||
/// It utilizes current binary value and vector implementation.
|
||||
#[derive(
|
||||
|
||||
@@ -65,7 +65,11 @@ impl DataType for ListType {
|
||||
}
|
||||
|
||||
fn as_arrow_type(&self) -> ArrowDataType {
|
||||
let field = Arc::new(Field::new("item", self.item_type.as_arrow_type(), true));
|
||||
let field = Arc::new(Field::new(
|
||||
Field::LIST_FIELD_DEFAULT_NAME,
|
||||
self.item_type.as_arrow_type(),
|
||||
true,
|
||||
));
|
||||
ArrowDataType::List(field)
|
||||
}
|
||||
|
||||
|
||||
@@ -80,7 +80,7 @@ pub trait LogicalPrimitiveType: 'static + Sized {
|
||||
fn cast_vector(vector: &dyn Vector) -> Result<&PrimitiveVector<Self>>;
|
||||
|
||||
/// Cast value ref to the primitive type.
|
||||
fn cast_value_ref(value: ValueRef) -> Result<Option<Self::Wrapper>>;
|
||||
fn cast_value_ref(value: &ValueRef) -> Result<Option<Self::Wrapper>>;
|
||||
}
|
||||
|
||||
/// A new type for [WrapperType], complement the `Ord` feature for it.
|
||||
@@ -194,10 +194,10 @@ macro_rules! define_logical_primitive_type {
|
||||
})
|
||||
}
|
||||
|
||||
fn cast_value_ref(value: ValueRef) -> Result<Option<$Native>> {
|
||||
fn cast_value_ref(value: &ValueRef) -> Result<Option<$Native>> {
|
||||
match value {
|
||||
ValueRef::Null => Ok(None),
|
||||
ValueRef::$TypeId(v) => Ok(Some(v.into())),
|
||||
ValueRef::$TypeId(v) => Ok(Some((*v).into())),
|
||||
other => error::CastTypeSnafu {
|
||||
msg: format!(
|
||||
"Failed to cast value {:?} to primitive type {}",
|
||||
|
||||
@@ -89,8 +89,8 @@ impl DataType for StringType {
|
||||
Value::Duration(v) => Some(Value::String(StringBytes::from(v.to_string()))),
|
||||
Value::Decimal128(v) => Some(Value::String(StringBytes::from(v.to_string()))),
|
||||
|
||||
// StringBytes is only support for utf-8, Value::Binary is not allowed.
|
||||
Value::Binary(_) | Value::List(_) => None,
|
||||
// StringBytes is only support for utf-8, Value::Binary and collections are not allowed.
|
||||
Value::Binary(_) | Value::List(_) | Value::Struct(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::prelude::{ConcreteDataType, DataType, LogicalTypeId};
|
||||
use crate::value::Value;
|
||||
use crate::vectors::StructVectorBuilder;
|
||||
|
||||
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
pub struct StructType {
|
||||
@@ -31,7 +32,7 @@ impl TryFrom<&Fields> for StructType {
|
||||
.iter()
|
||||
.map(|field| {
|
||||
Ok(StructField::new(
|
||||
field.name().to_string(),
|
||||
field.name().clone(),
|
||||
ConcreteDataType::try_from(field.data_type())?,
|
||||
field.is_nullable(),
|
||||
))
|
||||
@@ -68,16 +69,15 @@ impl DataType for StructType {
|
||||
}
|
||||
|
||||
fn as_arrow_type(&self) -> ArrowDataType {
|
||||
let fields = self
|
||||
.fields
|
||||
.iter()
|
||||
.map(|f| Field::new(f.name.clone(), f.data_type.as_arrow_type(), f.nullable))
|
||||
.collect();
|
||||
let fields = self.as_arrow_fields();
|
||||
ArrowDataType::Struct(fields)
|
||||
}
|
||||
|
||||
fn create_mutable_vector(&self, _capacity: usize) -> Box<dyn crate::prelude::MutableVector> {
|
||||
unimplemented!("What is the mutable vector for StructVector?");
|
||||
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn crate::prelude::MutableVector> {
|
||||
Box::new(StructVectorBuilder::with_type_and_capacity(
|
||||
self.clone(),
|
||||
capacity,
|
||||
))
|
||||
}
|
||||
|
||||
fn try_cast(&self, _from: Value) -> Option<Value> {
|
||||
@@ -94,6 +94,13 @@ impl StructType {
|
||||
pub fn fields(&self) -> &[StructField] {
|
||||
&self.fields
|
||||
}
|
||||
|
||||
pub fn as_arrow_fields(&self) -> Fields {
|
||||
self.fields
|
||||
.iter()
|
||||
.map(|f| Field::new(f.name.clone(), f.data_type.as_arrow_type(), f.nullable))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
|
||||
@@ -149,14 +149,14 @@ macro_rules! impl_data_type_for_time {
|
||||
})
|
||||
}
|
||||
|
||||
fn cast_value_ref(value: ValueRef) -> crate::Result<Option<Self::Wrapper>> {
|
||||
fn cast_value_ref(value: &ValueRef) -> crate::Result<Option<Self::Wrapper>> {
|
||||
match value {
|
||||
ValueRef::Null => Ok(None),
|
||||
ValueRef::Int64(v) =>{
|
||||
Ok(Some([<Time $unit>]::from(v)))
|
||||
Ok(Some([<Time $unit>]::from(*v)))
|
||||
}
|
||||
ValueRef::Time(t) => match t.unit() {
|
||||
TimeUnit::$unit => Ok(Some([<Time $unit>](t))),
|
||||
TimeUnit::$unit => Ok(Some([<Time $unit>](*t))),
|
||||
other => error::CastTypeSnafu {
|
||||
msg: format!(
|
||||
"Failed to cast Time value with different unit {:?} to {}",
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user