mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
refactor: per reviews
This commit is contained in:
@@ -148,8 +148,8 @@ impl Flownode for FlownodeManager {
|
||||
|
||||
let rows: Vec<DiffRow> = rows_proto
|
||||
.into_iter()
|
||||
.map(repr::Row::from)
|
||||
.map(|r| {
|
||||
let r = repr::Row::from(r);
|
||||
let reordered = fetch_order
|
||||
.iter()
|
||||
.map(|&i| r.inner[i].clone())
|
||||
|
||||
@@ -303,7 +303,7 @@ impl FlownodeContext {
|
||||
.get_by_name(table_name)
|
||||
.map(|(_, gid)| gid)
|
||||
.unwrap();
|
||||
self.schema.insert(gid, schema.into_named(vec![]));
|
||||
self.schema.insert(gid, schema.into_unnamed());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -122,17 +122,17 @@ impl TableSource {
|
||||
];
|
||||
|
||||
let raw_schema = table_info_value.table_info.meta.schema;
|
||||
let (col_names, column_types): (Vec<_>, Vec<_>) = raw_schema
|
||||
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
|
||||
.column_schemas
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|col| {
|
||||
(
|
||||
col.name.clone(),
|
||||
ColumnType {
|
||||
nullable: col.is_nullable(),
|
||||
scalar_type: col.data_type,
|
||||
},
|
||||
col.name,
|
||||
)
|
||||
})
|
||||
.unzip();
|
||||
|
||||
@@ -263,9 +263,18 @@ impl RelationType {
|
||||
true
|
||||
}
|
||||
|
||||
/// Return relation describe with column names
|
||||
pub fn into_named(self, names: Vec<ColumnName>) -> RelationDesc {
|
||||
RelationDesc { typ: self, names }
|
||||
}
|
||||
|
||||
/// Return relation describe without column names
|
||||
pub fn into_unnamed(self) -> RelationDesc {
|
||||
RelationDesc {
|
||||
typ: self,
|
||||
names: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The type of a `Value`
|
||||
|
||||
@@ -211,7 +211,7 @@ mod test {
|
||||
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
|
||||
|
||||
tri_map.insert(Some(name.clone()), Some(1024), gid);
|
||||
schemas.insert(gid, schema.into_named(vec![]));
|
||||
schemas.insert(gid, schema.into_unnamed());
|
||||
}
|
||||
|
||||
{
|
||||
@@ -225,7 +225,7 @@ mod test {
|
||||
ColumnType::new(CDT::uint32_datatype(), false),
|
||||
ColumnType::new(CDT::datetime_datatype(), false),
|
||||
]);
|
||||
schemas.insert(gid, schema.into_named(vec![]));
|
||||
schemas.insert(gid, schema.into_unnamed());
|
||||
tri_map.insert(Some(name.clone()), Some(1025), gid);
|
||||
}
|
||||
|
||||
|
||||
@@ -101,7 +101,6 @@ impl TypedExpr {
|
||||
.unzip();
|
||||
|
||||
match arg_len {
|
||||
// because variadic function can also have 1 arguments, we need to check if it's a variadic function first
|
||||
1 if UnaryFunc::from_str_and_type(fn_name, None).is_ok() => {
|
||||
let func = UnaryFunc::from_str_and_type(fn_name, None)?;
|
||||
let arg = arg_exprs[0].clone();
|
||||
@@ -124,7 +123,6 @@ impl TypedExpr {
|
||||
|
||||
Ok(TypedExpr::new(arg.call_unary(func), ret_type))
|
||||
}
|
||||
// because variadic function can also have 2 arguments, we need to check if it's a variadic function first
|
||||
2 if BinaryFunc::from_str_expr_and_type(
|
||||
fn_name,
|
||||
&arg_exprs,
|
||||
|
||||
@@ -101,7 +101,14 @@ impl KeyExpiryManager {
|
||||
.or_default()
|
||||
.insert(row.clone());
|
||||
|
||||
self.get_expire_duration(now, row)
|
||||
if let Some(expire_time) = self.compute_expiration_timestamp(now) {
|
||||
if expire_time > event_ts {
|
||||
// return how much time it's expired
|
||||
return Ok(Some(expire_time - event_ts));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Get the expire duration of a key, if it's expired by now.
|
||||
|
||||
Reference in New Issue
Block a user