fix: Fix common grpc expr (#730)

* fix compile errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename fn names

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix styles

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix wranings in common-time

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2022-12-09 14:24:04 +08:00
committed by GitHub
parent d0892bf0b7
commit 42fdc7251a
12 changed files with 142 additions and 61 deletions

View File

@@ -222,7 +222,9 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) ->
// Timestamp in key part is intentionally left to 0
columns_values.insert(
"timestamp".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(0)])) as _,
Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond(
0,
)])) as _,
);
columns_values.insert(
@@ -232,14 +234,14 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) ->
columns_values.insert(
"gmt_created".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(
Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond(
util::current_time_millis(),
)])) as _,
);
columns_values.insert(
"gmt_modified".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(
Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond(
util::current_time_millis(),
)])) as _,
);

View File

@@ -22,11 +22,11 @@ use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateEx
use common_base::BitVec;
use common_time::timestamp::Timestamp;
use common_time::{Date, DateTime};
use datatypes::data_type::ConcreteDataType;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::{ValueRef, VectorRef};
use datatypes::schema::SchemaRef;
use datatypes::value::Value;
use datatypes::vectors::VectorBuilder;
use datatypes::vectors::MutableVector;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest};
@@ -99,7 +99,7 @@ pub fn column_to_vector(column: &Column, rows: u32) -> Result<VectorRef> {
let column_datatype = wrapper.datatype();
let rows = rows as usize;
let mut vector = VectorBuilder::with_capacity(wrapper.into(), rows);
let mut vector = ConcreteDataType::from(wrapper).create_mutable_vector(rows);
if let Some(values) = &column.values {
let values = collect_column_values(column_datatype, values);
@@ -110,21 +110,31 @@ pub fn column_to_vector(column: &Column, rows: u32) -> Result<VectorRef> {
for i in 0..rows {
if let Some(true) = nulls_iter.next() {
vector.push_null();
vector
.push_value_ref(ValueRef::Null)
.context(CreateVectorSnafu)?;
} else {
let value_ref = values_iter.next().context(InvalidColumnProtoSnafu {
err_msg: format!(
"value not found at position {} of column {}",
i, &column.column_name
),
})?;
vector.try_push_ref(value_ref).context(CreateVectorSnafu)?;
let value_ref = values_iter
.next()
.with_context(|| InvalidColumnProtoSnafu {
err_msg: format!(
"value not found at position {} of column {}",
i, &column.column_name
),
})?;
vector
.push_value_ref(value_ref)
.context(CreateVectorSnafu)?;
}
}
} else {
(0..rows).for_each(|_| vector.push_null());
(0..rows).try_for_each(|_| {
vector
.push_value_ref(ValueRef::Null)
.context(CreateVectorSnafu)
})?;
}
Ok(vector.finish())
Ok(vector.to_vector())
}
fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Vec<ValueRef> {
@@ -174,9 +184,24 @@ fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Ve
DateTime::new(*v)
))
}
ColumnDataType::Timestamp => {
ColumnDataType::TimestampSecond => {
collect_values!(values.ts_second_values, |v| ValueRef::Timestamp(
Timestamp::new_second(*v)
))
}
ColumnDataType::TimestampMillisecond => {
collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp(
Timestamp::from_millis(*v)
Timestamp::new_millisecond(*v)
))
}
ColumnDataType::TimestampMicrosecond => {
collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp(
Timestamp::new_microsecond(*v)
))
}
ColumnDataType::TimestampNanosecond => {
collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp(
Timestamp::new_nanosecond(*v)
))
}
}
@@ -289,10 +314,7 @@ pub fn insertion_expr_to_request(
},
)?;
let data_type = &column_schema.data_type;
entry.insert(VectorBuilder::with_capacity(
data_type.clone(),
row_count as usize,
))
entry.insert(data_type.create_mutable_vector(row_count as usize))
}
};
add_values_to_builder(vector_builder, values, row_count as usize, null_mask)?;
@@ -300,7 +322,7 @@ pub fn insertion_expr_to_request(
}
let columns_values = columns_builders
.into_iter()
.map(|(column_name, mut vector_builder)| (column_name, vector_builder.finish()))
.map(|(column_name, mut vector_builder)| (column_name, vector_builder.to_vector()))
.collect();
Ok(InsertRequest {
@@ -312,7 +334,7 @@ pub fn insertion_expr_to_request(
}
fn add_values_to_builder(
builder: &mut VectorBuilder,
builder: &mut Box<dyn MutableVector>,
values: Values,
row_count: usize,
null_mask: Vec<u8>,
@@ -323,9 +345,11 @@ fn add_values_to_builder(
if null_mask.is_empty() {
ensure!(values.len() == row_count, IllegalInsertDataSnafu);
values.iter().for_each(|value| {
builder.push(value);
});
values.iter().try_for_each(|value| {
builder
.push_value_ref(value.as_value_ref())
.context(CreateVectorSnafu)
})?;
} else {
let null_mask = BitVec::from_vec(null_mask);
ensure!(
@@ -336,9 +360,13 @@ fn add_values_to_builder(
let mut idx_of_values = 0;
for idx in 0..row_count {
match is_null(&null_mask, idx) {
Some(true) => builder.push(&Value::Null),
Some(true) => builder
.push_value_ref(ValueRef::Null)
.context(CreateVectorSnafu)?,
_ => {
builder.push(&values[idx_of_values]);
builder
.push_value_ref(values[idx_of_values].as_value_ref())
.context(CreateVectorSnafu)?;
idx_of_values += 1
}
}
@@ -420,7 +448,7 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {
ConcreteDataType::Timestamp(_) => values
.ts_millisecond_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::from_millis(v)))
.map(|v| Value::Timestamp(Timestamp::new_millisecond(v)))
.collect(),
ConcreteDataType::Null(_) => unreachable!(),
ConcreteDataType::List(_) => unreachable!(),
@@ -624,8 +652,8 @@ mod tests {
assert_eq!(Value::Float64(0.1.into()), memory.get(1));
let ts = insert_req.columns_values.get("ts").unwrap();
assert_eq!(Value::Timestamp(Timestamp::from_millis(100)), ts.get(0));
assert_eq!(Value::Timestamp(Timestamp::from_millis(101)), ts.get(1));
assert_eq!(Value::Timestamp(Timestamp::new_millisecond(100)), ts.get(0));
assert_eq!(Value::Timestamp(Timestamp::new_millisecond(101)), ts.get(1));
}
#[test]
@@ -753,7 +781,7 @@ mod tests {
semantic_type: TIMESTAMP_SEMANTIC_TYPE,
values: Some(ts_vals),
null_mask: vec![0],
datatype: ColumnDataType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
};
(

View File

@@ -98,7 +98,7 @@ mod tests {
Date::from_str("1969-01-01").unwrap().to_string()
);
let now = Utc::now().date().format("%F").to_string();
let now = Utc::now().date_naive().format("%F").to_string();
assert_eq!(now, Date::from_str(&now).unwrap().to_string());
}

View File

@@ -35,13 +35,34 @@ impl Timestamp {
Self { unit, value }
}
pub fn from_millis(value: i64) -> Self {
pub fn new_second(value: i64) -> Self {
Self {
value,
unit: TimeUnit::Second,
}
}
pub fn new_millisecond(value: i64) -> Self {
Self {
value,
unit: TimeUnit::Millisecond,
}
}
pub fn new_microsecond(value: i64) -> Self {
Self {
value,
unit: TimeUnit::Microsecond,
}
}
pub fn new_nanosecond(value: i64) -> Self {
Self {
value,
unit: TimeUnit::Nanosecond,
}
}
pub fn unit(&self) -> TimeUnit {
self.unit
}
@@ -274,10 +295,11 @@ mod tests {
// but expected timestamp is in UTC timezone
fn check_from_str(s: &str, expect: &str) {
let ts = Timestamp::from_str(s).unwrap();
let time = NaiveDateTime::from_timestamp(
let time = NaiveDateTime::from_timestamp_opt(
ts.value / 1_000_000_000,
(ts.value % 1_000_000_000) as u32,
);
)
.unwrap();
assert_eq!(expect, time.to_string());
}
@@ -290,7 +312,13 @@ mod tests {
check_from_str(
"2020-09-08 13:42:29",
&NaiveDateTime::from_timestamp_opt(
1599572549 - Local.timestamp(0, 0).offset().fix().local_minus_utc() as i64,
1599572549
- Local
.timestamp_opt(0, 0)
.unwrap()
.offset()
.fix()
.local_minus_utc() as i64,
0,
)
.unwrap()
@@ -300,7 +328,13 @@ mod tests {
check_from_str(
"2020-09-08T13:42:29",
&NaiveDateTime::from_timestamp_opt(
1599572549 - Local.timestamp(0, 0).offset().fix().local_minus_utc() as i64,
1599572549
- Local
.timestamp_opt(0, 0)
.unwrap()
.offset()
.fix()
.local_minus_utc() as i64,
0,
)
.unwrap()
@@ -310,7 +344,13 @@ mod tests {
check_from_str(
"2020-09-08 13:42:29.042",
&NaiveDateTime::from_timestamp_opt(
1599572549 - Local.timestamp(0, 0).offset().fix().local_minus_utc() as i64,
1599572549
- Local
.timestamp_opt(0, 0)
.unwrap()
.offset()
.fix()
.local_minus_utc() as i64,
42000000,
)
.unwrap()
@@ -321,7 +361,13 @@ mod tests {
check_from_str(
"2020-09-08T13:42:29.042",
&NaiveDateTime::from_timestamp_opt(
1599572549 - Local.timestamp(0, 0).offset().fix().local_minus_utc() as i64,
1599572549
- Local
.timestamp_opt(0, 0)
.unwrap()
.offset()
.fix()
.local_minus_utc() as i64,
42000000,
)
.unwrap()
@@ -341,19 +387,19 @@ mod tests {
assert_eq!(datetime_str, ts.to_iso8601_string());
let ts_millis = 1668070237000;
let ts = Timestamp::from_millis(ts_millis);
let ts = Timestamp::new_millisecond(ts_millis);
assert_eq!("2022-11-10 08:50:37+0000", ts.to_iso8601_string());
let ts_millis = -1000;
let ts = Timestamp::from_millis(ts_millis);
let ts = Timestamp::new_millisecond(ts_millis);
assert_eq!("1969-12-31 23:59:59+0000", ts.to_iso8601_string());
let ts_millis = -1;
let ts = Timestamp::from_millis(ts_millis);
let ts = Timestamp::new_millisecond(ts_millis);
assert_eq!("1969-12-31 23:59:59.999+0000", ts.to_iso8601_string());
let ts_millis = -1001;
let ts = Timestamp::from_millis(ts_millis);
let ts = Timestamp::new_millisecond(ts_millis);
assert_eq!("1969-12-31 23:59:58.999+0000", ts.to_iso8601_string());
}

View File

@@ -33,8 +33,8 @@ mod tests {
.duration_since(time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let datetime_now = chrono::Utc.timestamp_millis(now);
let datetime_std = chrono::Utc.timestamp_millis(millis_from_std);
let datetime_now = chrono::Utc.timestamp_millis_opt(now).unwrap();
let datetime_std = chrono::Utc.timestamp_millis_opt(millis_from_std).unwrap();
assert_eq!(datetime_std.year(), datetime_now.year());
assert_eq!(datetime_std.month(), datetime_now.month());

View File

@@ -288,11 +288,11 @@ mod tests {
let ts = &columns_values["ts"];
assert_eq!(2, ts.len());
assert_eq!(
Value::from(Timestamp::from_millis(1655276557000i64)),
Value::from(Timestamp::new_millisecond(1655276557000i64)),
ts.get(0)
);
assert_eq!(
Value::from(Timestamp::from_millis(1655276558000i64)),
Value::from(Timestamp::new_millisecond(1655276558000i64)),
ts.get(1)
);
}

View File

@@ -1085,7 +1085,7 @@ mod tests {
);
check_type_and_value(
&ConcreteDataType::timestamp_millisecond_datatype(),
&Value::Timestamp(Timestamp::from_millis(1)),
&Value::Timestamp(Timestamp::new_millisecond(1)),
);
}
@@ -1180,7 +1180,7 @@ mod tests {
assert_eq!(
serde_json::Value::Number(1.into()),
to_json(Value::Timestamp(Timestamp::from_millis(1)))
to_json(Value::Timestamp(Timestamp::new_millisecond(1)))
);
let json_value: serde_json::Value =
@@ -1238,7 +1238,7 @@ mod tests {
check_as_value_ref!(Int64, -12);
check_as_value_ref!(Float32, OrderedF32::from(16.0));
check_as_value_ref!(Float64, OrderedF64::from(16.0));
check_as_value_ref!(Timestamp, Timestamp::from_millis(1));
check_as_value_ref!(Timestamp, Timestamp::new_millisecond(1));
assert_eq!(
ValueRef::String("hello"),

View File

@@ -875,7 +875,7 @@ pub fn pyobj_try_to_typed_val(
// FIXME(dennis): we always consider the timestamp unit is millis, it's not correct if user define timestamp column with other units.
obj.try_into_value::<i64>(vm)
.ok()
.map(Timestamp::from_millis)
.map(Timestamp::new_millisecond)
.map(value::Value::Timestamp)
}
_ => unreachable!(),

View File

@@ -104,17 +104,19 @@ impl ScriptsTable {
// Timestamp in key part is intentionally left to 0
columns_values.insert(
"timestamp".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(0)])) as _,
Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond(
0,
)])) as _,
);
columns_values.insert(
"gmt_created".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(
Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond(
util::current_time_millis(),
)])) as _,
);
columns_values.insert(
"gmt_modified".to_string(),
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(
Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond(
util::current_time_millis(),
)])) as _,
);

View File

@@ -208,9 +208,9 @@ mod tests {
let cpu = columns.get("cpu").unwrap();
let expected: Vec<Value> = vec![
Value::Timestamp(Timestamp::from_millis(1665893727685_i64)),
Value::Timestamp(Timestamp::from_millis(1665893727686_i64)),
Value::Timestamp(Timestamp::from_millis(1665893727689_i64)),
Value::Timestamp(Timestamp::new_millisecond(1665893727685_i64)),
Value::Timestamp(Timestamp::new_millisecond(1665893727686_i64)),
Value::Timestamp(Timestamp::new_millisecond(1665893727689_i64)),
];
assert_vector(&expected, ts);

View File

@@ -73,7 +73,7 @@ fn kvs_with_index(
UInt64VectorBuilder::with_capacity(keys.len()),
);
for key in keys {
key_builders.0.push(Some(Timestamp::from_millis(key.0)));
key_builders.0.push(Some(Timestamp::new_millisecond(key.0)));
key_builders.1.push(Some(key.1));
}
let row_keys = vec![

View File

@@ -195,7 +195,10 @@ mod tests {
for i in 0..row_num {
let ts = batch.column(0).get(i);
let v = batch.column(1).get(i);
assert_eq!(Value::Timestamp(Timestamp::from_millis(data[index].0)), ts);
assert_eq!(
Value::Timestamp(Timestamp::new_millisecond(data[index].0)),
ts
);
assert_eq!(Value::from(data[index].1), v);
assert_eq!(Value::from(sequence), batch.column(2).get(i));