mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-30 11:50:38 +00:00
refactor: rename "value" semantic type to "field" (#1326)
* global replace Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * change desc table Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * update sqlness result Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -51,7 +51,7 @@ impl EmptyMetric {
|
||||
end: Millisecond,
|
||||
interval: Millisecond,
|
||||
time_index_column_name: String,
|
||||
value_column_name: String,
|
||||
field_column_name: String,
|
||||
) -> DataFusionResult<Self> {
|
||||
let schema = Arc::new(DFSchema::new_with_metadata(
|
||||
vec![
|
||||
@@ -61,7 +61,7 @@ impl EmptyMetric {
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
DFField::new(Some(""), &value_column_name, DataType::Float64, true),
|
||||
DFField::new(Some(""), &field_column_name, DataType::Float64, true),
|
||||
],
|
||||
HashMap::new(),
|
||||
)?);
|
||||
@@ -253,11 +253,11 @@ mod test {
|
||||
end: Millisecond,
|
||||
interval: Millisecond,
|
||||
time_column_name: String,
|
||||
value_column_name: String,
|
||||
field_column_name: String,
|
||||
expected: String,
|
||||
) {
|
||||
let empty_metric =
|
||||
EmptyMetric::new(start, end, interval, time_column_name, value_column_name).unwrap();
|
||||
EmptyMetric::new(start, end, interval, time_column_name, field_column_name).unwrap();
|
||||
let empty_metric_exec = empty_metric.to_execution_plan();
|
||||
|
||||
let session_context = SessionContext::default();
|
||||
|
||||
@@ -388,11 +388,11 @@ mod test {
|
||||
180_000, 240_000, // every 60s
|
||||
241_000, 271_000, 291_000, // others
|
||||
])) as _;
|
||||
let value_column = Arc::new(Float64Array::from_slice([1.0; 10])) as _;
|
||||
let field_column = Arc::new(Float64Array::from_slice([1.0; 10])) as _;
|
||||
let path_column = Arc::new(StringArray::from_slice(["foo"; 10])) as _;
|
||||
let data = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![timestamp_column, value_column, path_column],
|
||||
vec![timestamp_column, field_column, path_column],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -299,12 +299,12 @@ mod test {
|
||||
let timestamp_column = Arc::new(TimestampMillisecondArray::from_slice([
|
||||
60_000, 120_000, 0, 30_000, 90_000,
|
||||
])) as _;
|
||||
let value_column = Arc::new(Float64Array::from_slice([0.0, 1.0, 10.0, 100.0, 1000.0])) as _;
|
||||
let field_column = Arc::new(Float64Array::from_slice([0.0, 1.0, 10.0, 100.0, 1000.0])) as _;
|
||||
let path_column =
|
||||
Arc::new(StringArray::from_slice(["foo", "foo", "foo", "foo", "foo"])) as _;
|
||||
let data = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![timestamp_column, value_column, path_column],
|
||||
vec![timestamp_column, field_column, path_column],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ pub struct RangeManipulate {
|
||||
range: Millisecond,
|
||||
|
||||
time_index: String,
|
||||
value_columns: Vec<String>,
|
||||
field_columns: Vec<String>,
|
||||
input: LogicalPlan,
|
||||
output_schema: DFSchemaRef,
|
||||
}
|
||||
@@ -68,18 +68,18 @@ impl RangeManipulate {
|
||||
interval: Millisecond,
|
||||
range: Millisecond,
|
||||
time_index: String,
|
||||
value_columns: Vec<String>,
|
||||
field_columns: Vec<String>,
|
||||
input: LogicalPlan,
|
||||
) -> DataFusionResult<Self> {
|
||||
let output_schema =
|
||||
Self::calculate_output_schema(input.schema(), &time_index, &value_columns)?;
|
||||
Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
|
||||
Ok(Self {
|
||||
start,
|
||||
end,
|
||||
interval,
|
||||
range,
|
||||
time_index,
|
||||
value_columns,
|
||||
field_columns,
|
||||
input,
|
||||
output_schema,
|
||||
})
|
||||
@@ -100,7 +100,7 @@ impl RangeManipulate {
|
||||
fn calculate_output_schema(
|
||||
input_schema: &DFSchemaRef,
|
||||
time_index: &str,
|
||||
value_columns: &[String],
|
||||
field_columns: &[String],
|
||||
) -> DataFusionResult<DFSchemaRef> {
|
||||
let mut columns = input_schema.fields().clone();
|
||||
|
||||
@@ -118,7 +118,7 @@ impl RangeManipulate {
|
||||
)));
|
||||
|
||||
// process value columns
|
||||
for name in value_columns {
|
||||
for name in field_columns {
|
||||
let Some(index) = input_schema.index_of_column_by_name(None, name)? else {
|
||||
return Err(datafusion::common::field_not_found(None::<TableReference>, name, input_schema.as_ref()))
|
||||
};
|
||||
@@ -139,7 +139,7 @@ impl RangeManipulate {
|
||||
range: self.range,
|
||||
time_index_column: self.time_index.clone(),
|
||||
time_range_column: self.range_timestamp_name(),
|
||||
value_columns: self.value_columns.clone(),
|
||||
field_columns: self.field_columns.clone(),
|
||||
input: exec_input,
|
||||
output_schema: SchemaRef::new(self.output_schema.as_ref().into()),
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
@@ -168,7 +168,7 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
|
||||
write!(
|
||||
f,
|
||||
"PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
|
||||
self.start, self.end, self.interval, self.range, self.time_index, self.value_columns
|
||||
self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
|
||||
)
|
||||
}
|
||||
|
||||
@@ -181,7 +181,7 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
|
||||
interval: self.interval,
|
||||
range: self.range,
|
||||
time_index: self.time_index.clone(),
|
||||
value_columns: self.value_columns.clone(),
|
||||
field_columns: self.field_columns.clone(),
|
||||
input: inputs[0].clone(),
|
||||
output_schema: self.output_schema.clone(),
|
||||
}
|
||||
@@ -196,7 +196,7 @@ pub struct RangeManipulateExec {
|
||||
range: Millisecond,
|
||||
time_index_column: String,
|
||||
time_range_column: String,
|
||||
value_columns: Vec<String>,
|
||||
field_columns: Vec<String>,
|
||||
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
output_schema: SchemaRef,
|
||||
@@ -240,7 +240,7 @@ impl ExecutionPlan for RangeManipulateExec {
|
||||
range: self.range,
|
||||
time_index_column: self.time_index_column.clone(),
|
||||
time_range_column: self.time_range_column.clone(),
|
||||
value_columns: self.value_columns.clone(),
|
||||
field_columns: self.field_columns.clone(),
|
||||
output_schema: self.output_schema.clone(),
|
||||
input: children[0].clone(),
|
||||
metric: self.metric.clone(),
|
||||
@@ -260,8 +260,8 @@ impl ExecutionPlan for RangeManipulateExec {
|
||||
.column_with_name(&self.time_index_column)
|
||||
.unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
|
||||
.0;
|
||||
let value_columns = self
|
||||
.value_columns
|
||||
let field_columns = self
|
||||
.field_columns
|
||||
.iter()
|
||||
.map(|value_col| {
|
||||
schema
|
||||
@@ -276,7 +276,7 @@ impl ExecutionPlan for RangeManipulateExec {
|
||||
interval: self.interval,
|
||||
range: self.range,
|
||||
time_index,
|
||||
value_columns,
|
||||
field_columns,
|
||||
output_schema: self.output_schema.clone(),
|
||||
input,
|
||||
metric: baseline_metric,
|
||||
@@ -325,7 +325,7 @@ pub struct RangeManipulateStream {
|
||||
interval: Millisecond,
|
||||
range: Millisecond,
|
||||
time_index: usize,
|
||||
value_columns: Vec<usize>,
|
||||
field_columns: Vec<usize>,
|
||||
|
||||
output_schema: SchemaRef,
|
||||
input: SendableRecordBatchStream,
|
||||
@@ -364,7 +364,7 @@ impl RangeManipulateStream {
|
||||
|
||||
// transform columns
|
||||
let mut new_columns = input.columns().to_vec();
|
||||
for index in self.value_columns.iter() {
|
||||
for index in self.field_columns.iter() {
|
||||
other_columns.remove(index);
|
||||
let column = input.column(*index);
|
||||
let new_column = Arc::new(
|
||||
@@ -460,14 +460,14 @@ mod test {
|
||||
180_000, 240_000, // every 60s
|
||||
241_000, 271_000, 291_000, // others
|
||||
])) as _;
|
||||
let value_column: ArrayRef = Arc::new(Float64Array::from_slice([1.0; 10])) as _;
|
||||
let field_column: ArrayRef = Arc::new(Float64Array::from_slice([1.0; 10])) as _;
|
||||
let path_column = Arc::new(StringArray::from_slice(["foo"; 10])) as _;
|
||||
let data = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
timestamp_column,
|
||||
value_column.clone(),
|
||||
value_column,
|
||||
field_column.clone(),
|
||||
field_column,
|
||||
path_column,
|
||||
],
|
||||
)
|
||||
@@ -485,12 +485,12 @@ mod test {
|
||||
) {
|
||||
let memory_exec = Arc::new(prepare_test_data());
|
||||
let time_index = TIME_INDEX_COLUMN.to_string();
|
||||
let value_columns = vec!["value_1".to_string(), "value_2".to_string()];
|
||||
let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
|
||||
let manipulate_output_schema = SchemaRef::new(
|
||||
RangeManipulate::calculate_output_schema(
|
||||
&memory_exec.schema().to_dfschema_ref().unwrap(),
|
||||
&time_index,
|
||||
&value_columns,
|
||||
&field_columns,
|
||||
)
|
||||
.unwrap()
|
||||
.as_ref()
|
||||
@@ -501,7 +501,7 @@ mod test {
|
||||
end,
|
||||
interval,
|
||||
range,
|
||||
value_columns,
|
||||
field_columns,
|
||||
output_schema: manipulate_output_schema,
|
||||
time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
|
||||
time_index_column: time_index,
|
||||
|
||||
@@ -62,7 +62,7 @@ const LEFT_PLAN_JOIN_ALIAS: &str = "lhs";
|
||||
const SPECIAL_TIME_FUNCTION: &str = "time";
|
||||
|
||||
/// default value column name for empty metric
|
||||
const DEFAULT_VALUE_COLUMN: &str = "value";
|
||||
const DEFAULT_FIELD_COLUMN: &str = "value";
|
||||
|
||||
/// Special modifier to project field columns under multi-field mode
|
||||
const FIELD_COLUMN_MATCHER: &str = "__field__";
|
||||
@@ -78,7 +78,7 @@ struct PromPlannerContext {
|
||||
// planner states
|
||||
table_name: Option<String>,
|
||||
time_index_column: Option<String>,
|
||||
value_columns: Vec<String>,
|
||||
field_columns: Vec<String>,
|
||||
tag_columns: Vec<String>,
|
||||
field_column_matcher: Option<Vec<Matcher>>,
|
||||
/// The range in millisecond of range selector. None if there is no range selector.
|
||||
@@ -156,7 +156,7 @@ impl PromPlanner {
|
||||
PromExpr::Unary(UnaryExpr { expr }) => {
|
||||
// Unary Expr in PromQL implys the `-` operator
|
||||
let input = self.prom_expr_to_plan(*expr.clone()).await?;
|
||||
self.projection_for_each_value_column(input, |col| {
|
||||
self.projection_for_each_field_column(input, |col| {
|
||||
Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
|
||||
})?
|
||||
}
|
||||
@@ -204,9 +204,9 @@ impl PromPlanner {
|
||||
Ok(binary_expr)
|
||||
};
|
||||
if is_comparison_op && !should_return_bool {
|
||||
self.filter_on_value_column(input, bin_expr_builder)?
|
||||
self.filter_on_field_column(input, bin_expr_builder)?
|
||||
} else {
|
||||
self.projection_for_each_value_column(input, bin_expr_builder)?
|
||||
self.projection_for_each_field_column(input, bin_expr_builder)?
|
||||
}
|
||||
}
|
||||
// lhs is a column, rhs is a literal
|
||||
@@ -227,27 +227,27 @@ impl PromPlanner {
|
||||
Ok(binary_expr)
|
||||
};
|
||||
if is_comparison_op && !should_return_bool {
|
||||
self.filter_on_value_column(input, bin_expr_builder)?
|
||||
self.filter_on_field_column(input, bin_expr_builder)?
|
||||
} else {
|
||||
self.projection_for_each_value_column(input, bin_expr_builder)?
|
||||
self.projection_for_each_field_column(input, bin_expr_builder)?
|
||||
}
|
||||
}
|
||||
// both are columns. join them on time index
|
||||
(None, None) => {
|
||||
let left_input = self.prom_expr_to_plan(*lhs.clone()).await?;
|
||||
let left_value_columns = self.ctx.value_columns.clone();
|
||||
let left_field_columns = self.ctx.field_columns.clone();
|
||||
let left_schema = left_input.schema().clone();
|
||||
|
||||
let right_input = self.prom_expr_to_plan(*rhs.clone()).await?;
|
||||
let right_value_columns = self.ctx.value_columns.clone();
|
||||
let right_field_columns = self.ctx.field_columns.clone();
|
||||
let right_schema = right_input.schema().clone();
|
||||
|
||||
let mut value_columns =
|
||||
left_value_columns.iter().zip(right_value_columns.iter());
|
||||
// the new ctx.value_columns for the generated join plan
|
||||
let join_plan = self.join_on_non_value_columns(left_input, right_input)?;
|
||||
let mut field_columns =
|
||||
left_field_columns.iter().zip(right_field_columns.iter());
|
||||
// the new ctx.field_columns for the generated join plan
|
||||
let join_plan = self.join_on_non_field_columns(left_input, right_input)?;
|
||||
let bin_expr_builder = |_: &String| {
|
||||
let (left_col_name, right_col_name) = value_columns.next().unwrap();
|
||||
let (left_col_name, right_col_name) = field_columns.next().unwrap();
|
||||
let left_col = left_schema
|
||||
.field_with_name(None, left_col_name)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
@@ -271,9 +271,9 @@ impl PromPlanner {
|
||||
Ok(binary_expr)
|
||||
};
|
||||
if is_comparison_op && !should_return_bool {
|
||||
self.filter_on_value_column(join_plan, bin_expr_builder)?
|
||||
self.filter_on_field_column(join_plan, bin_expr_builder)?
|
||||
} else {
|
||||
self.projection_for_each_value_column(join_plan, bin_expr_builder)?
|
||||
self.projection_for_each_field_column(join_plan, bin_expr_builder)?
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -344,7 +344,7 @@ impl PromPlanner {
|
||||
.time_index_column
|
||||
.clone()
|
||||
.expect("time index should be set in `setup_context`"),
|
||||
self.ctx.value_columns.clone(),
|
||||
self.ctx.field_columns.clone(),
|
||||
normalize,
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
@@ -357,7 +357,7 @@ impl PromPlanner {
|
||||
// TODO(ruihang): refactor this, transform the AST in advance to include an empty metric table.
|
||||
if func.name == SPECIAL_TIME_FUNCTION {
|
||||
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
|
||||
self.ctx.value_columns = vec![DEFAULT_VALUE_COLUMN.to_string()];
|
||||
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
|
||||
self.ctx.table_name = Some(String::new());
|
||||
|
||||
return Ok(LogicalPlan::Extension(Extension {
|
||||
@@ -367,7 +367,7 @@ impl PromPlanner {
|
||||
self.ctx.end,
|
||||
self.ctx.interval,
|
||||
SPECIAL_TIME_FUNCTION.to_string(),
|
||||
DEFAULT_VALUE_COLUMN.to_string(),
|
||||
DEFAULT_FIELD_COLUMN.to_string(),
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?,
|
||||
),
|
||||
@@ -447,7 +447,7 @@ impl PromPlanner {
|
||||
|
||||
// make a projection plan if there is any `__field__` matcher
|
||||
if let Some(field_matchers) = &self.ctx.field_column_matcher {
|
||||
let col_set = self.ctx.value_columns.iter().collect::<HashSet<_>>();
|
||||
let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
|
||||
// opt-in set
|
||||
let mut result_set = HashSet::new();
|
||||
// opt-out set
|
||||
@@ -475,14 +475,14 @@ impl PromPlanner {
|
||||
}
|
||||
}
|
||||
MatchOp::Re(regex) => {
|
||||
for col in &self.ctx.value_columns {
|
||||
for col in &self.ctx.field_columns {
|
||||
if regex.is_match(col) {
|
||||
result_set.insert(col.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
MatchOp::NotRe(regex) => {
|
||||
for col in &self.ctx.value_columns {
|
||||
for col in &self.ctx.field_columns {
|
||||
if regex.is_match(col) {
|
||||
reverse_set.insert(col.clone());
|
||||
}
|
||||
@@ -498,7 +498,7 @@ impl PromPlanner {
|
||||
result_set.remove(&col);
|
||||
}
|
||||
|
||||
self.ctx.value_columns = result_set.iter().cloned().collect();
|
||||
self.ctx.field_columns = result_set.iter().cloned().collect();
|
||||
let exprs = result_set
|
||||
.into_iter()
|
||||
.map(|col| DfExpr::Column(col.into()))
|
||||
@@ -589,7 +589,7 @@ impl PromPlanner {
|
||||
if let Some(time_index) = &self.ctx.time_index_column {
|
||||
all_fields.remove(time_index);
|
||||
}
|
||||
for value in &self.ctx.value_columns {
|
||||
for value in &self.ctx.field_columns {
|
||||
all_fields.remove(value);
|
||||
}
|
||||
|
||||
@@ -688,10 +688,10 @@ impl PromPlanner {
|
||||
let values = table
|
||||
.table_info()
|
||||
.meta
|
||||
.value_column_names()
|
||||
.field_column_names()
|
||||
.cloned()
|
||||
.collect();
|
||||
self.ctx.value_columns = values;
|
||||
self.ctx.field_columns = values;
|
||||
|
||||
// set primary key (tag) columns
|
||||
let tags = table
|
||||
@@ -749,7 +749,7 @@ impl PromPlanner {
|
||||
// TODO(ruihang): check function args list
|
||||
|
||||
// TODO(ruihang): set this according to in-param list
|
||||
let value_column_pos = 0;
|
||||
let field_column_pos = 0;
|
||||
let scalar_func = match func.name {
|
||||
"increase" => ScalarFunc::ExtrapolateUdf(Increase::scalar_udf(
|
||||
self.ctx.range.context(ExpectRangeSelectorSnafu)?,
|
||||
@@ -795,19 +795,19 @@ impl PromPlanner {
|
||||
};
|
||||
|
||||
// TODO(ruihang): handle those functions doesn't require input
|
||||
let mut exprs = Vec::with_capacity(self.ctx.value_columns.len());
|
||||
for value in &self.ctx.value_columns {
|
||||
let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
|
||||
for value in &self.ctx.field_columns {
|
||||
let col_expr = DfExpr::Column(Column::from_name(value));
|
||||
|
||||
match scalar_func.clone() {
|
||||
ScalarFunc::DataFusionBuiltin(fun) => {
|
||||
other_input_exprs.insert(value_column_pos, col_expr);
|
||||
other_input_exprs.insert(field_column_pos, col_expr);
|
||||
let fn_expr = DfExpr::ScalarFunction {
|
||||
fun,
|
||||
args: other_input_exprs.clone(),
|
||||
};
|
||||
exprs.push(fn_expr);
|
||||
other_input_exprs.remove(value_column_pos);
|
||||
other_input_exprs.remove(field_column_pos);
|
||||
}
|
||||
ScalarFunc::Udf(fun) => {
|
||||
let ts_range_expr = DfExpr::Column(Column::from_name(
|
||||
@@ -815,15 +815,15 @@ impl PromPlanner {
|
||||
self.ctx.time_index_column.as_ref().unwrap(),
|
||||
),
|
||||
));
|
||||
other_input_exprs.insert(value_column_pos, ts_range_expr);
|
||||
other_input_exprs.insert(value_column_pos + 1, col_expr);
|
||||
other_input_exprs.insert(field_column_pos, ts_range_expr);
|
||||
other_input_exprs.insert(field_column_pos + 1, col_expr);
|
||||
let fn_expr = DfExpr::ScalarUDF {
|
||||
fun: Arc::new(fun),
|
||||
args: other_input_exprs.clone(),
|
||||
};
|
||||
exprs.push(fn_expr);
|
||||
other_input_exprs.remove(value_column_pos + 1);
|
||||
other_input_exprs.remove(value_column_pos);
|
||||
other_input_exprs.remove(field_column_pos + 1);
|
||||
other_input_exprs.remove(field_column_pos);
|
||||
}
|
||||
ScalarFunc::ExtrapolateUdf(fun) => {
|
||||
let ts_range_expr = DfExpr::Column(Column::from_name(
|
||||
@@ -831,34 +831,34 @@ impl PromPlanner {
|
||||
self.ctx.time_index_column.as_ref().unwrap(),
|
||||
),
|
||||
));
|
||||
other_input_exprs.insert(value_column_pos, ts_range_expr);
|
||||
other_input_exprs.insert(value_column_pos + 1, col_expr);
|
||||
other_input_exprs.insert(field_column_pos, ts_range_expr);
|
||||
other_input_exprs.insert(field_column_pos + 1, col_expr);
|
||||
other_input_exprs
|
||||
.insert(value_column_pos + 2, self.create_time_index_column_expr()?);
|
||||
.insert(field_column_pos + 2, self.create_time_index_column_expr()?);
|
||||
let fn_expr = DfExpr::ScalarUDF {
|
||||
fun: Arc::new(fun),
|
||||
args: other_input_exprs.clone(),
|
||||
};
|
||||
exprs.push(fn_expr);
|
||||
other_input_exprs.remove(value_column_pos + 2);
|
||||
other_input_exprs.remove(value_column_pos + 1);
|
||||
other_input_exprs.remove(value_column_pos);
|
||||
other_input_exprs.remove(field_column_pos + 2);
|
||||
other_input_exprs.remove(field_column_pos + 1);
|
||||
other_input_exprs.remove(field_column_pos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// update value columns' name, and alias them to remove qualifiers
|
||||
let mut new_value_columns = Vec::with_capacity(exprs.len());
|
||||
let mut new_field_columns = Vec::with_capacity(exprs.len());
|
||||
exprs = exprs
|
||||
.into_iter()
|
||||
.map(|expr| {
|
||||
let display_name = expr.display_name()?;
|
||||
new_value_columns.push(display_name.clone());
|
||||
new_field_columns.push(display_name.clone());
|
||||
Ok(expr.alias(display_name))
|
||||
})
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
self.ctx.value_columns = new_value_columns;
|
||||
self.ctx.field_columns = new_field_columns;
|
||||
|
||||
Ok(exprs)
|
||||
}
|
||||
@@ -893,8 +893,8 @@ impl PromPlanner {
|
||||
}
|
||||
|
||||
fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
|
||||
let mut exprs = Vec::with_capacity(self.ctx.value_columns.len());
|
||||
for value in &self.ctx.value_columns {
|
||||
let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
|
||||
for value in &self.ctx.field_columns {
|
||||
let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
|
||||
exprs.push(expr);
|
||||
}
|
||||
@@ -936,7 +936,7 @@ impl PromPlanner {
|
||||
// perform aggregate operation to each value column
|
||||
let exprs: Vec<DfExpr> = self
|
||||
.ctx
|
||||
.value_columns
|
||||
.field_columns
|
||||
.iter()
|
||||
.map(|col| {
|
||||
DfExpr::AggregateFunction(AggregateFunction {
|
||||
@@ -949,13 +949,13 @@ impl PromPlanner {
|
||||
.collect();
|
||||
|
||||
// update value column name according to the aggregators
|
||||
let mut new_value_columns = Vec::with_capacity(self.ctx.value_columns.len());
|
||||
let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
|
||||
let normalized_exprs =
|
||||
normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
|
||||
for expr in normalized_exprs {
|
||||
new_value_columns.push(expr.display_name().context(DataFusionPlanningSnafu)?);
|
||||
new_field_columns.push(expr.display_name().context(DataFusionPlanningSnafu)?);
|
||||
}
|
||||
self.ctx.value_columns = new_value_columns;
|
||||
self.ctx.field_columns = new_field_columns;
|
||||
|
||||
Ok(exprs)
|
||||
}
|
||||
@@ -1028,7 +1028,7 @@ impl PromPlanner {
|
||||
|
||||
/// Build a inner join on time index column and tag columns to concat two logical plans.
|
||||
/// The left plan will be alised as [`LEFT_PLAN_JOIN_ALIAS`].
|
||||
fn join_on_non_value_columns(
|
||||
fn join_on_non_field_columns(
|
||||
&self,
|
||||
left: LogicalPlan,
|
||||
right: LogicalPlan,
|
||||
@@ -1068,7 +1068,7 @@ impl PromPlanner {
|
||||
///
|
||||
/// This function will update the value columns in the context. Those new column names
|
||||
/// don't contains qualifier.
|
||||
fn projection_for_each_value_column<F>(
|
||||
fn projection_for_each_field_column<F>(
|
||||
&mut self,
|
||||
input: LogicalPlan,
|
||||
name_to_expr: F,
|
||||
@@ -1076,7 +1076,7 @@ impl PromPlanner {
|
||||
where
|
||||
F: FnMut(&String) -> Result<DfExpr>,
|
||||
{
|
||||
let non_value_columns_iter = self
|
||||
let non_field_columns_iter = self
|
||||
.ctx
|
||||
.tag_columns
|
||||
.iter()
|
||||
@@ -1089,27 +1089,27 @@ impl PromPlanner {
|
||||
});
|
||||
|
||||
// build computation exprs
|
||||
let result_value_columns = self
|
||||
let result_field_columns = self
|
||||
.ctx
|
||||
.value_columns
|
||||
.field_columns
|
||||
.iter()
|
||||
.map(name_to_expr)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
// alias the computation exprs to remove qualifier
|
||||
self.ctx.value_columns = result_value_columns
|
||||
self.ctx.field_columns = result_field_columns
|
||||
.iter()
|
||||
.map(|expr| expr.display_name())
|
||||
.collect::<DfResult<Vec<_>>>()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
let value_columns_iter = result_value_columns
|
||||
let field_columns_iter = result_field_columns
|
||||
.into_iter()
|
||||
.zip(self.ctx.value_columns.iter())
|
||||
.zip(self.ctx.field_columns.iter())
|
||||
.map(|(expr, name)| Ok(DfExpr::Alias(Box::new(expr), name.to_string())));
|
||||
|
||||
// chain non-value columns (unchanged) and value columns (applied computation then alias)
|
||||
let project_fields = non_value_columns_iter
|
||||
.chain(value_columns_iter)
|
||||
let project_fields = non_field_columns_iter
|
||||
.chain(field_columns_iter)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
LogicalPlanBuilder::from(input)
|
||||
@@ -1121,7 +1121,7 @@ impl PromPlanner {
|
||||
|
||||
/// Build a filter plan that filter on value column. Notice that only one value column
|
||||
/// is expected.
|
||||
fn filter_on_value_column<F>(
|
||||
fn filter_on_field_column<F>(
|
||||
&self,
|
||||
input: LogicalPlan,
|
||||
mut name_to_expr: F,
|
||||
@@ -1130,16 +1130,16 @@ impl PromPlanner {
|
||||
F: FnMut(&String) -> Result<DfExpr>,
|
||||
{
|
||||
ensure!(
|
||||
self.ctx.value_columns.len() == 1,
|
||||
self.ctx.field_columns.len() == 1,
|
||||
UnsupportedExprSnafu {
|
||||
name: "filter on multi-value input"
|
||||
}
|
||||
);
|
||||
|
||||
let value_column_filter = name_to_expr(&self.ctx.value_columns[0])?;
|
||||
let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
|
||||
|
||||
LogicalPlanBuilder::from(input)
|
||||
.filter(value_column_filter)
|
||||
.filter(field_column_filter)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)
|
||||
|
||||
@@ -38,7 +38,7 @@ const COLUMN_DEFAULT_COLUMN: &str = "Default";
|
||||
const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
|
||||
|
||||
const SEMANTIC_TYPE_PRIMARY_KEY: &str = "PRIMARY KEY";
|
||||
const SEMANTIC_TYPE_VALUE: &str = "VALUE";
|
||||
const SEMANTIC_TYPE_FIELD: &str = "FIELD";
|
||||
const SEMANTIC_TYPE_TIME_INDEX: &str = "TIME INDEX";
|
||||
|
||||
const NULLABLE_YES: &str = "YES";
|
||||
@@ -217,7 +217,7 @@ fn describe_column_semantic_types(
|
||||
} else if cs.is_time_index() {
|
||||
String::from(SEMANTIC_TYPE_TIME_INDEX)
|
||||
} else {
|
||||
String::from(SEMANTIC_TYPE_VALUE)
|
||||
String::from(SEMANTIC_TYPE_FIELD)
|
||||
}
|
||||
})
|
||||
.collect::<Vec<String>>(),
|
||||
@@ -242,7 +242,7 @@ mod test {
|
||||
use crate::error::Result;
|
||||
use crate::sql::{
|
||||
describe_table, DESCRIBE_TABLE_OUTPUT_SCHEMA, NULLABLE_NO, NULLABLE_YES,
|
||||
SEMANTIC_TYPE_TIME_INDEX, SEMANTIC_TYPE_VALUE,
|
||||
SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_TIME_INDEX,
|
||||
};
|
||||
|
||||
#[test]
|
||||
@@ -271,7 +271,7 @@ mod test {
|
||||
Arc::new(StringVector::from(vec![NULLABLE_YES, NULLABLE_NO])) as _,
|
||||
Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
|
||||
Arc::new(StringVector::from(vec![
|
||||
SEMANTIC_TYPE_VALUE,
|
||||
SEMANTIC_TYPE_FIELD,
|
||||
SEMANTIC_TYPE_TIME_INDEX,
|
||||
])) as _,
|
||||
];
|
||||
|
||||
@@ -18,7 +18,7 @@ use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest
|
||||
use crate::error::{self, Result};
|
||||
|
||||
pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
|
||||
pub const OPENTSDB_VALUE_COLUMN_NAME: &str = "greptime_value";
|
||||
pub const OPENTSDB_FIELD_COLUMN_NAME: &str = "greptime_value";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DataPoint {
|
||||
@@ -139,8 +139,8 @@ impl DataPoint {
|
||||
};
|
||||
columns.push(ts_column);
|
||||
|
||||
let value_column = Column {
|
||||
column_name: OPENTSDB_VALUE_COLUMN_NAME.to_string(),
|
||||
let field_column = Column {
|
||||
column_name: OPENTSDB_FIELD_COLUMN_NAME.to_string(),
|
||||
values: Some(column::Values {
|
||||
f64_values: vec![self.value],
|
||||
..Default::default()
|
||||
@@ -149,7 +149,7 @@ impl DataPoint {
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
..Default::default()
|
||||
};
|
||||
columns.push(value_column);
|
||||
columns.push(field_column);
|
||||
|
||||
for (tagk, tagv) in self.tags.iter() {
|
||||
columns.push(Column {
|
||||
@@ -273,7 +273,7 @@ mod test {
|
||||
vec![1000]
|
||||
);
|
||||
|
||||
assert_eq!(columns[1].column_name, OPENTSDB_VALUE_COLUMN_NAME);
|
||||
assert_eq!(columns[1].column_name, OPENTSDB_FIELD_COLUMN_NAME);
|
||||
assert_eq!(columns[1].values.as_ref().unwrap().f64_values, vec![1.0]);
|
||||
|
||||
assert_eq!(columns[2].column_name, "tagk1");
|
||||
|
||||
@@ -250,7 +250,7 @@ impl PromJsonResponse {
|
||||
// TODO(ruihang): wish there is a better way to do this.
|
||||
let mut timestamp_column_index = None;
|
||||
let mut tag_column_indices = Vec::new();
|
||||
let mut first_value_column_index = None;
|
||||
let mut first_field_column_index = None;
|
||||
|
||||
for (i, column) in batches.schema().column_schemas().iter().enumerate() {
|
||||
match column.data_type {
|
||||
@@ -260,8 +260,8 @@ impl PromJsonResponse {
|
||||
}
|
||||
}
|
||||
ConcreteDataType::Float64(_) => {
|
||||
if first_value_column_index.is_none() {
|
||||
first_value_column_index = Some(i);
|
||||
if first_field_column_index.is_none() {
|
||||
first_field_column_index = Some(i);
|
||||
}
|
||||
}
|
||||
ConcreteDataType::String(_) => {
|
||||
@@ -274,7 +274,7 @@ impl PromJsonResponse {
|
||||
let timestamp_column_index = timestamp_column_index.context(InternalSnafu {
|
||||
err_msg: "no timestamp column found".to_string(),
|
||||
})?;
|
||||
let first_value_column_index = first_value_column_index.context(InternalSnafu {
|
||||
let first_field_column_index = first_field_column_index.context(InternalSnafu {
|
||||
err_msg: "no value column found".to_string(),
|
||||
})?;
|
||||
|
||||
@@ -302,8 +302,8 @@ impl PromJsonResponse {
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondVector>()
|
||||
.unwrap();
|
||||
let value_column = batch
|
||||
.column(first_value_column_index)
|
||||
let field_column = batch
|
||||
.column(first_field_column_index)
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Vector>()
|
||||
.unwrap();
|
||||
@@ -324,7 +324,7 @@ impl PromJsonResponse {
|
||||
|
||||
// retrieve value
|
||||
let value =
|
||||
Into::<f64>::into(value_column.get_data(row_index).unwrap()).to_string();
|
||||
Into::<f64>::into(field_column.get_data(row_index).unwrap()).to_string();
|
||||
|
||||
buffer.entry(tags).or_default().push((timestamp, value));
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ use snap::raw::{Decoder, Encoder};
|
||||
use crate::error::{self, Result};
|
||||
|
||||
const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
|
||||
const VALUE_COLUMN_NAME: &str = "greptime_value";
|
||||
const FIELD_COLUMN_NAME: &str = "greptime_value";
|
||||
pub const METRIC_NAME_LABEL: &str = "__name__";
|
||||
|
||||
/// Metrics for push gateway protocol
|
||||
@@ -185,7 +185,7 @@ fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<Ti
|
||||
));
|
||||
|
||||
for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
|
||||
if column_schema.name == VALUE_COLUMN_NAME
|
||||
if column_schema.name == FIELD_COLUMN_NAME
|
||||
|| column_schema.name == TIMESTAMP_COLUMN_NAME
|
||||
{
|
||||
continue;
|
||||
@@ -235,17 +235,17 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
|
||||
}
|
||||
);
|
||||
|
||||
let value_column = recordbatch.column_by_name(VALUE_COLUMN_NAME).context(
|
||||
let field_column = recordbatch.column_by_name(FIELD_COLUMN_NAME).context(
|
||||
error::InvalidPromRemoteReadQueryResultSnafu {
|
||||
msg: "missing greptime_value column in query result",
|
||||
},
|
||||
)?;
|
||||
ensure!(
|
||||
value_column.data_type() == ConcreteDataType::float64_datatype(),
|
||||
field_column.data_type() == ConcreteDataType::float64_datatype(),
|
||||
error::InvalidPromRemoteReadQueryResultSnafu {
|
||||
msg: format!(
|
||||
"Expect value column of datatype Float64, actual {:?}",
|
||||
value_column.data_type()
|
||||
field_column.data_type()
|
||||
)
|
||||
}
|
||||
);
|
||||
@@ -263,11 +263,11 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
if ts_column.is_null(row) || value_column.is_null(row) {
|
||||
if ts_column.is_null(row) || field_column.is_null(row) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let value: f64 = match value_column.get(row) {
|
||||
let value: f64 = match field_column.get(row) {
|
||||
Value::Float64(value) => value.into(),
|
||||
_ => unreachable!("checked by the \"ensure\" above"),
|
||||
};
|
||||
@@ -308,8 +308,8 @@ fn to_grpc_insert_request(mut timeseries: TimeSeries) -> Result<GrpcInsertReques
|
||||
};
|
||||
columns.push(ts_column);
|
||||
|
||||
let value_column = Column {
|
||||
column_name: VALUE_COLUMN_NAME.to_string(),
|
||||
let field_column = Column {
|
||||
column_name: FIELD_COLUMN_NAME.to_string(),
|
||||
values: Some(column::Values {
|
||||
f64_values: samples.iter().map(|x| x.value).collect(),
|
||||
..Default::default()
|
||||
@@ -318,7 +318,7 @@ fn to_grpc_insert_request(mut timeseries: TimeSeries) -> Result<GrpcInsertReques
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
..Default::default()
|
||||
};
|
||||
columns.push(value_column);
|
||||
columns.push(field_column);
|
||||
|
||||
let mut table_name = None;
|
||||
|
||||
@@ -527,7 +527,7 @@ mod tests {
|
||||
vec![1000, 2000]
|
||||
);
|
||||
|
||||
assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME);
|
||||
assert_eq!(columns[1].column_name, FIELD_COLUMN_NAME);
|
||||
assert_eq!(
|
||||
columns[1].values.as_ref().unwrap().f64_values,
|
||||
vec![1.0, 2.0]
|
||||
@@ -553,7 +553,7 @@ mod tests {
|
||||
vec![1000, 2000]
|
||||
);
|
||||
|
||||
assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME);
|
||||
assert_eq!(columns[1].column_name, FIELD_COLUMN_NAME);
|
||||
assert_eq!(
|
||||
columns[1].values.as_ref().unwrap().f64_values,
|
||||
vec![3.0, 4.0]
|
||||
@@ -584,7 +584,7 @@ mod tests {
|
||||
vec![1000, 2000, 3000]
|
||||
);
|
||||
|
||||
assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME);
|
||||
assert_eq!(columns[1].column_name, FIELD_COLUMN_NAME);
|
||||
assert_eq!(
|
||||
columns[1].values.as_ref().unwrap().f64_values,
|
||||
vec![5.0, 6.0, 7.0]
|
||||
@@ -611,7 +611,7 @@ mod tests {
|
||||
true,
|
||||
),
|
||||
ColumnSchema::new(
|
||||
VALUE_COLUMN_NAME,
|
||||
FIELD_COLUMN_NAME,
|
||||
ConcreteDataType::float64_datatype(),
|
||||
true,
|
||||
),
|
||||
|
||||
@@ -28,8 +28,8 @@ pub const TIMESTAMP_NAME: &str = "timestamp";
|
||||
pub fn schema_for_test() -> RegionSchemaRef {
|
||||
let desc = RegionDescBuilder::new("bench")
|
||||
.enable_version_column(true)
|
||||
.push_value_column(("v1", LogicalTypeId::UInt64, true))
|
||||
.push_value_column(("v2", LogicalTypeId::String, true))
|
||||
.push_field_column(("v1", LogicalTypeId::UInt64, true))
|
||||
.push_field_column(("v2", LogicalTypeId::String, true))
|
||||
.build();
|
||||
let metadata: RegionMetadata = desc.try_into().unwrap();
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ impl RegionDescBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn push_value_column(mut self, column_def: ColumnDef) -> Self {
|
||||
pub fn push_field_column(mut self, column_def: ColumnDef) -> Self {
|
||||
let column = self.new_column(column_def);
|
||||
self.default_cf_builder = self.default_cf_builder.push_column(column);
|
||||
self
|
||||
|
||||
@@ -111,7 +111,7 @@ mod tests {
|
||||
// Just build a region desc and use its columns metadata.
|
||||
let desc = RegionDescBuilder::new("test")
|
||||
.enable_version_column(false)
|
||||
.push_value_column(("v", LogicalTypeId::UInt64, true))
|
||||
.push_field_column(("v", LogicalTypeId::UInt64, true))
|
||||
.build();
|
||||
let metadata: RegionMetadata = desc.try_into().unwrap();
|
||||
metadata.schema().clone()
|
||||
|
||||
@@ -439,7 +439,7 @@ mod tests {
|
||||
let region_name = "region-0";
|
||||
let desc = RegionDescBuilder::new(region_name)
|
||||
.push_key_column(("k1", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v1", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v1", LogicalTypeId::Float32, true))
|
||||
.build();
|
||||
let ctx = EngineContext::default();
|
||||
let region = engine
|
||||
|
||||
@@ -364,7 +364,7 @@ mod tests {
|
||||
fn test_region_manifest_builder() {
|
||||
let desc = RegionDescBuilder::new("test_region_manifest_builder")
|
||||
.enable_version_column(true)
|
||||
.push_value_column(("v0", LogicalTypeId::Int64, true))
|
||||
.push_field_column(("v0", LogicalTypeId::Int64, true))
|
||||
.build();
|
||||
let region_metadata: RegionMetadata = desc.try_into().unwrap();
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ pub fn build_region_meta() -> RegionMetadata {
|
||||
let desc = RegionDescBuilder::new(region_name)
|
||||
.id(0)
|
||||
.push_key_column(("k1", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v1", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v1", LogicalTypeId::Float32, true))
|
||||
.build();
|
||||
desc.try_into().unwrap()
|
||||
}
|
||||
@@ -37,8 +37,8 @@ pub fn build_altered_region_meta() -> RegionMetadata {
|
||||
let desc = RegionDescBuilder::new(region_name)
|
||||
.id(0)
|
||||
.push_key_column(("k1", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v1", LogicalTypeId::Float32, true))
|
||||
.push_value_column(("v2", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v1", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v2", LogicalTypeId::Float32, true))
|
||||
.build();
|
||||
desc.try_into().unwrap()
|
||||
}
|
||||
|
||||
@@ -190,7 +190,7 @@ impl BTreeIterator {
|
||||
.map(|column_meta| column_meta.desc.data_type.clone());
|
||||
let value_data_types = self
|
||||
.schema
|
||||
.value_columns()
|
||||
.field_columns()
|
||||
.map(|column_meta| column_meta.desc.data_type.clone());
|
||||
|
||||
let key_columns = rows_to_vectors(
|
||||
@@ -198,7 +198,7 @@ impl BTreeIterator {
|
||||
self.adapter.source_key_needed(),
|
||||
keys.as_slice(),
|
||||
);
|
||||
let value_columns = rows_to_vectors(
|
||||
let field_columns = rows_to_vectors(
|
||||
value_data_types,
|
||||
self.adapter.source_value_needed(),
|
||||
values.as_slice(),
|
||||
@@ -206,7 +206,7 @@ impl BTreeIterator {
|
||||
|
||||
let batch = self.adapter.batch_from_parts(
|
||||
key_columns,
|
||||
value_columns,
|
||||
field_columns,
|
||||
Arc::new(sequences),
|
||||
Arc::new(op_types),
|
||||
)?;
|
||||
|
||||
@@ -150,7 +150,7 @@ mod tests {
|
||||
fn new_region_schema() -> RegionSchemaRef {
|
||||
let desc = RegionDescBuilder::new("test")
|
||||
.timestamp(("ts", LogicalTypeId::TimestampMillisecond, false))
|
||||
.push_value_column(("value", LogicalTypeId::Int64, true))
|
||||
.push_field_column(("value", LogicalTypeId::Int64, true))
|
||||
.enable_version_column(false)
|
||||
.build();
|
||||
let metadata: RegionMetadata = desc.try_into().unwrap();
|
||||
|
||||
@@ -32,8 +32,8 @@ pub fn schema_for_test() -> RegionSchemaRef {
|
||||
// Just build a region desc and use its columns metadata.
|
||||
let desc = RegionDescBuilder::new("test")
|
||||
.enable_version_column(true)
|
||||
.push_value_column(("v0", LogicalTypeId::UInt64, true))
|
||||
.push_value_column(("v1", LogicalTypeId::UInt64, true))
|
||||
.push_field_column(("v0", LogicalTypeId::UInt64, true))
|
||||
.push_field_column(("v1", LogicalTypeId::UInt64, true))
|
||||
.build();
|
||||
let metadata: RegionMetadata = desc.try_into().unwrap();
|
||||
|
||||
|
||||
@@ -494,7 +494,7 @@ impl ColumnsMetadata {
|
||||
}
|
||||
|
||||
/// Returns an iterator to all value columns (internal columns are excluded).
|
||||
pub fn iter_value_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
pub fn iter_field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns[self.row_key_end..self.user_column_end].iter()
|
||||
}
|
||||
|
||||
@@ -513,7 +513,7 @@ impl ColumnsMetadata {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn num_value_columns(&self) -> usize {
|
||||
pub fn num_field_columns(&self) -> usize {
|
||||
self.user_column_end - self.row_key_end
|
||||
}
|
||||
|
||||
@@ -692,16 +692,16 @@ impl ColumnsMetadataBuilder {
|
||||
}
|
||||
|
||||
fn push_row_key_column(&mut self, desc: ColumnDescriptor) -> Result<&mut Self> {
|
||||
self.push_value_column(consts::KEY_CF_ID, desc)
|
||||
self.push_field_column(consts::KEY_CF_ID, desc)
|
||||
}
|
||||
|
||||
fn push_value_column(
|
||||
fn push_field_column(
|
||||
&mut self,
|
||||
cf_id: ColumnFamilyId,
|
||||
desc: ColumnDescriptor,
|
||||
) -> Result<&mut Self> {
|
||||
ensure!(
|
||||
!is_internal_value_column(&desc.name),
|
||||
!is_internal_field_column(&desc.name),
|
||||
ReservedColumnSnafu { name: &desc.name }
|
||||
);
|
||||
|
||||
@@ -844,7 +844,7 @@ impl RegionMetadataBuilder {
|
||||
self.cfs_meta_builder.add_column_family(cf_meta)?;
|
||||
|
||||
for col in cf.columns {
|
||||
self.columns_meta_builder.push_value_column(cf.cf_id, col)?;
|
||||
self.columns_meta_builder.push_field_column(cf.cf_id, col)?;
|
||||
}
|
||||
|
||||
Ok(self)
|
||||
@@ -899,7 +899,7 @@ fn internal_column_descs() -> [ColumnDescriptor; 2] {
|
||||
|
||||
/// Returns true if this is an internal column for value column.
|
||||
#[inline]
|
||||
fn is_internal_value_column(column_name: &str) -> bool {
|
||||
fn is_internal_field_column(column_name: &str) -> bool {
|
||||
matches!(
|
||||
column_name,
|
||||
consts::SEQUENCE_COLUMN_NAME | consts::OP_TYPE_COLUMN_NAME
|
||||
@@ -929,7 +929,7 @@ mod tests {
|
||||
.timestamp(("ts", LogicalTypeId::TimestampMillisecond, false))
|
||||
.enable_version_column(false)
|
||||
.push_key_column(("k1", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v1", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v1", LogicalTypeId::Float32, true))
|
||||
.build();
|
||||
|
||||
let expect_schema = schema_util::new_schema_ref(
|
||||
@@ -945,7 +945,7 @@ mod tests {
|
||||
assert_eq!(region_name, metadata.name);
|
||||
assert_eq!(expect_schema, *metadata.user_schema());
|
||||
assert_eq!(2, metadata.columns.num_row_key_columns());
|
||||
assert_eq!(1, metadata.columns.num_value_columns());
|
||||
assert_eq!(1, metadata.columns.num_field_columns());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1100,10 +1100,10 @@ mod tests {
|
||||
.collect();
|
||||
assert_eq!(["k1", "ts"], &row_key_names[..]);
|
||||
// 1 value column
|
||||
assert_eq!(1, metadata.columns.num_value_columns());
|
||||
assert_eq!(1, metadata.columns.num_field_columns());
|
||||
let value_names: Vec<_> = metadata
|
||||
.columns
|
||||
.iter_value_columns()
|
||||
.iter_field_columns()
|
||||
.map(|column| &column.desc.name)
|
||||
.collect();
|
||||
assert_eq!(["v1"], &value_names[..]);
|
||||
@@ -1151,10 +1151,10 @@ mod tests {
|
||||
&row_key_names[..]
|
||||
);
|
||||
// 1 value column
|
||||
assert_eq!(1, metadata.columns.num_value_columns());
|
||||
assert_eq!(1, metadata.columns.num_field_columns());
|
||||
let value_names: Vec<_> = metadata
|
||||
.columns
|
||||
.iter_value_columns()
|
||||
.iter_field_columns()
|
||||
.map(|column| &column.desc.name)
|
||||
.collect();
|
||||
assert_eq!(["v1"], &value_names[..]);
|
||||
@@ -1179,7 +1179,7 @@ mod tests {
|
||||
let builder = RegionDescBuilder::new(region_name)
|
||||
.enable_version_column(false)
|
||||
.push_key_column(("k1", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v1", LogicalTypeId::Float32, true));
|
||||
.push_field_column(("v1", LogicalTypeId::Float32, true));
|
||||
let last_column_id = builder.last_column_id();
|
||||
let metadata: RegionMetadata = builder.build().try_into().unwrap();
|
||||
|
||||
@@ -1216,9 +1216,9 @@ mod tests {
|
||||
let builder: RegionMetadataBuilder = RegionDescBuilder::new(region_name)
|
||||
.enable_version_column(false)
|
||||
.push_key_column(("k1", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v1", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v1", LogicalTypeId::Float32, true))
|
||||
.push_key_column(("k2", LogicalTypeId::Int32, true))
|
||||
.push_value_column(("v2", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v2", LogicalTypeId::Float32, true))
|
||||
.build()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
@@ -1233,8 +1233,8 @@ mod tests {
|
||||
.enable_version_column(false)
|
||||
.push_key_column(("k1", LogicalTypeId::Int32, false))
|
||||
.push_key_column(("k2", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v1", LogicalTypeId::Float32, true))
|
||||
.push_value_column(("v2", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v1", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v2", LogicalTypeId::Float32, true))
|
||||
.build()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
@@ -1257,7 +1257,7 @@ mod tests {
|
||||
let last_column_id = builder.last_column_id() + 1;
|
||||
let builder: RegionMetadataBuilder = builder
|
||||
.set_last_column_id(last_column_id) // This id is reserved for v1
|
||||
.push_value_column(("v2", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v2", LogicalTypeId::Float32, true))
|
||||
.build()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
@@ -1271,8 +1271,8 @@ mod tests {
|
||||
.enable_version_column(false)
|
||||
.timestamp(("ts", LogicalTypeId::TimestampMillisecond, false))
|
||||
.push_key_column(("k0", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v0", LogicalTypeId::Float32, true))
|
||||
.push_value_column(("v1", LogicalTypeId::Float32, true));
|
||||
.push_field_column(("v0", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v1", LogicalTypeId::Float32, true));
|
||||
let last_column_id = builder.last_column_id();
|
||||
let metadata: RegionMetadata = builder.build().try_into().unwrap();
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ use crate::test_util::{self, config_util, schema_util, write_batch_util};
|
||||
pub fn new_metadata(region_name: &str, enable_version_column: bool) -> RegionMetadata {
|
||||
let desc = RegionDescBuilder::new(region_name)
|
||||
.enable_version_column(enable_version_column)
|
||||
.push_value_column(("v0", LogicalTypeId::Int64, true))
|
||||
.push_field_column(("v0", LogicalTypeId::Int64, true))
|
||||
.build();
|
||||
desc.try_into().unwrap()
|
||||
}
|
||||
@@ -269,7 +269,7 @@ async fn test_new_region() {
|
||||
let desc = RegionDescBuilder::new(region_name)
|
||||
.enable_version_column(true)
|
||||
.push_key_column(("k1", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v0", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v0", LogicalTypeId::Float32, true))
|
||||
.build();
|
||||
let metadata: RegionMetadata = desc.try_into().unwrap();
|
||||
|
||||
|
||||
@@ -457,7 +457,7 @@ async fn test_replay_metadata_after_open() {
|
||||
|
||||
let desc = RegionDescBuilder::new(REGION_NAME)
|
||||
.push_key_column(("k1", LogicalTypeId::Int32, false))
|
||||
.push_value_column(("v0", LogicalTypeId::Float32, true))
|
||||
.push_field_column(("v0", LogicalTypeId::Float32, true))
|
||||
.build();
|
||||
let metadata: &RegionMetadata = &desc.try_into().unwrap();
|
||||
let mut raw_metadata: RawRegionMetadata = metadata.into();
|
||||
|
||||
@@ -32,7 +32,7 @@ use crate::write_batch::WriteBatch;
|
||||
|
||||
/// Create metadata with schema (k0, timestamp, v0, v1)
|
||||
fn new_metadata(region_name: &str) -> RegionMetadata {
|
||||
let desc = descriptor_util::desc_with_value_columns(region_name, 2);
|
||||
let desc = descriptor_util::desc_with_field_columns(region_name, 2);
|
||||
desc.try_into().unwrap()
|
||||
}
|
||||
|
||||
|
||||
@@ -37,13 +37,13 @@ mod tests {
|
||||
new_batch_with_num_values(1)
|
||||
}
|
||||
|
||||
pub(crate) fn new_batch_with_num_values(num_value_columns: usize) -> Batch {
|
||||
pub(crate) fn new_batch_with_num_values(num_field_columns: usize) -> Batch {
|
||||
let k0 = Int64Vector::from_slice([1, 2, 3]);
|
||||
let timestamp = TimestampMillisecondVector::from_vec(vec![4, 5, 6]);
|
||||
|
||||
let mut columns: Vec<VectorRef> = vec![Arc::new(k0), Arc::new(timestamp)];
|
||||
|
||||
for i in 0..num_value_columns {
|
||||
for i in 0..num_field_columns {
|
||||
let vi = Int64Vector::from_slice([i as i64, i as i64, i as i64]);
|
||||
columns.push(Arc::new(vi));
|
||||
}
|
||||
|
||||
@@ -125,10 +125,10 @@ impl ReadAdapter {
|
||||
// `dest_schema` might be projected, so we need to find out value columns that not be read
|
||||
// by the `dest_schema`.
|
||||
|
||||
for (offset, value_column) in source_schema.value_columns().iter().enumerate() {
|
||||
for (offset, field_column) in source_schema.field_columns().iter().enumerate() {
|
||||
// Iterate value columns in source and mark those not in destination as unneeded.
|
||||
if !dest_schema.is_needed(value_column.id()) {
|
||||
is_source_needed[source_schema.value_column_index_by_offset(offset)] = false;
|
||||
if !dest_schema.is_needed(field_column.id()) {
|
||||
is_source_needed[source_schema.field_column_index_by_offset(offset)] = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -204,7 +204,7 @@ impl ReadAdapter {
|
||||
pub fn batch_from_parts(
|
||||
&self,
|
||||
row_key_columns: Vec<VectorRef>,
|
||||
mut value_columns: Vec<VectorRef>,
|
||||
mut field_columns: Vec<VectorRef>,
|
||||
sequences: VectorRef,
|
||||
op_types: VectorRef,
|
||||
) -> Result<Batch> {
|
||||
@@ -213,8 +213,8 @@ impl ReadAdapter {
|
||||
|
||||
let mut source = row_key_columns;
|
||||
// Reserve space for value, sequence and op_type
|
||||
source.reserve(value_columns.len() + 2);
|
||||
source.append(&mut value_columns);
|
||||
source.reserve(field_columns.len() + 2);
|
||||
source.append(&mut field_columns);
|
||||
// Internal columns are push in sequence, op_type order.
|
||||
source.push(sequences);
|
||||
source.push(op_types);
|
||||
@@ -318,12 +318,12 @@ mod tests {
|
||||
fn call_batch_from_parts(
|
||||
adapter: &ReadAdapter,
|
||||
batch: &Batch,
|
||||
num_value_columns: usize,
|
||||
num_field_columns: usize,
|
||||
) -> Batch {
|
||||
let key = batch.columns()[0..2].to_vec();
|
||||
let value = batch.columns()[2..2 + num_value_columns].to_vec();
|
||||
let sequence = batch.column(2 + num_value_columns).clone();
|
||||
let op_type = batch.column(2 + num_value_columns + 1).clone();
|
||||
let value = batch.columns()[2..2 + num_field_columns].to_vec();
|
||||
let sequence = batch.column(2 + num_field_columns).clone();
|
||||
let op_type = batch.column(2 + num_field_columns + 1).clone();
|
||||
|
||||
adapter
|
||||
.batch_from_parts(key, value, sequence, op_type)
|
||||
@@ -333,9 +333,9 @@ mod tests {
|
||||
fn check_batch_from_parts_without_padding(
|
||||
adapter: &ReadAdapter,
|
||||
batch: &Batch,
|
||||
num_value_columns: usize,
|
||||
num_field_columns: usize,
|
||||
) {
|
||||
let new_batch = call_batch_from_parts(adapter, batch, num_value_columns);
|
||||
let new_batch = call_batch_from_parts(adapter, batch, num_field_columns);
|
||||
assert_eq!(*batch, new_batch);
|
||||
}
|
||||
|
||||
@@ -495,7 +495,7 @@ mod tests {
|
||||
// (k0, timestamp, v0, v1) with version 0.
|
||||
let region_schema_old = Arc::new(schema_util::new_region_schema(0, 2));
|
||||
|
||||
let mut descriptor = descriptor_util::desc_with_value_columns(tests::REGION_NAME, 2);
|
||||
let mut descriptor = descriptor_util::desc_with_field_columns(tests::REGION_NAME, 2);
|
||||
// Assign a much larger column id to v0.
|
||||
descriptor.default_cf.columns[0].id = descriptor.default_cf.columns.last().unwrap().id + 10;
|
||||
let metadata: RegionMetadata = descriptor.try_into().unwrap();
|
||||
|
||||
@@ -91,8 +91,8 @@ impl RegionSchema {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn value_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns.iter_value_columns()
|
||||
pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns.iter_field_columns()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -101,8 +101,8 @@ impl RegionSchema {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn num_value_columns(&self) -> usize {
|
||||
self.columns.num_value_columns()
|
||||
pub fn num_field_columns(&self) -> usize {
|
||||
self.columns.num_field_columns()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -193,10 +193,10 @@ mod tests {
|
||||
assert_eq!(2, region_schema.num_row_key_columns());
|
||||
|
||||
// Checks value column.
|
||||
let mut values = region_schema.value_columns();
|
||||
let mut values = region_schema.field_columns();
|
||||
assert_eq!("v0", values.next().unwrap().desc.name);
|
||||
assert_eq!(None, values.next());
|
||||
assert_eq!(1, region_schema.num_value_columns());
|
||||
assert_eq!(1, region_schema.num_field_columns());
|
||||
|
||||
// Checks version.
|
||||
assert_eq!(123, region_schema.version());
|
||||
|
||||
@@ -181,13 +181,13 @@ impl StoreSchema {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn value_columns(&self) -> &[ColumnMetadata] {
|
||||
pub(crate) fn field_columns(&self) -> &[ColumnMetadata] {
|
||||
&self.columns[self.row_key_end..self.user_column_end]
|
||||
}
|
||||
|
||||
/// Returns the index of the value column according its `offset`.
|
||||
#[inline]
|
||||
pub(crate) fn value_column_index_by_offset(&self, offset: usize) -> usize {
|
||||
pub(crate) fn field_column_index_by_offset(&self, offset: usize) -> usize {
|
||||
self.row_key_end + offset
|
||||
}
|
||||
|
||||
|
||||
@@ -76,7 +76,7 @@ impl RegionDescBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn push_value_column(mut self, column_def: ColumnDef) -> Self {
|
||||
pub fn push_field_column(mut self, column_def: ColumnDef) -> Self {
|
||||
let column = self.new_column(column_def);
|
||||
self.default_cf_builder = self.default_cf_builder.push_column(column);
|
||||
self
|
||||
@@ -125,12 +125,12 @@ impl RegionDescBuilder {
|
||||
}
|
||||
|
||||
/// Create desc with schema (k0, timestamp, v0, ... vn-1)
|
||||
pub fn desc_with_value_columns(region_name: &str, num_value_columns: usize) -> RegionDescriptor {
|
||||
pub fn desc_with_field_columns(region_name: &str, num_field_columns: usize) -> RegionDescriptor {
|
||||
let mut builder =
|
||||
RegionDescBuilder::new(region_name).push_key_column(("k0", LogicalTypeId::Int64, false));
|
||||
for i in 0..num_value_columns {
|
||||
for i in 0..num_field_columns {
|
||||
let name = format!("v{i}");
|
||||
builder = builder.push_value_column((&name, LogicalTypeId::Int64, true));
|
||||
builder = builder.push_field_column((&name, LogicalTypeId::Int64, true));
|
||||
}
|
||||
builder.build()
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ use crate::test_util::descriptor_util::RegionDescBuilder;
|
||||
fn new_region_schema() -> RegionSchemaRef {
|
||||
let desc = RegionDescBuilder::new("read-util")
|
||||
.enable_version_column(false)
|
||||
.push_value_column(("v0", LogicalTypeId::Int64, true))
|
||||
.push_field_column(("v0", LogicalTypeId::Int64, true))
|
||||
.build();
|
||||
let metadata: RegionMetadata = desc.try_into().unwrap();
|
||||
metadata.schema().clone()
|
||||
|
||||
@@ -58,9 +58,9 @@ pub fn new_schema_ref(column_defs: &[ColumnDef], timestamp_index: Option<usize>)
|
||||
Arc::new(new_schema(column_defs, timestamp_index))
|
||||
}
|
||||
|
||||
pub fn new_region_schema(version: u32, num_value_columns: usize) -> RegionSchema {
|
||||
pub fn new_region_schema(version: u32, num_field_columns: usize) -> RegionSchema {
|
||||
let metadata: RegionMetadata =
|
||||
descriptor_util::desc_with_value_columns("REGION_NAME", num_value_columns)
|
||||
descriptor_util::desc_with_field_columns("REGION_NAME", num_field_columns)
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -138,7 +138,7 @@ impl TableMeta {
|
||||
.map(|idx| &columns_schemas[*idx].name)
|
||||
}
|
||||
|
||||
pub fn value_column_names(&self) -> impl Iterator<Item = &String> {
|
||||
pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
|
||||
let columns_schemas = &self.schema.column_schemas();
|
||||
self.value_indices.iter().filter_map(|idx| {
|
||||
let column = &columns_schemas[*idx];
|
||||
|
||||
@@ -7,7 +7,7 @@ DESC TABLE t;
|
||||
+-------+-------+------+---------+---------------+
|
||||
| Field | Type | Null | Default | Semantic Type |
|
||||
+-------+-------+------+---------+---------------+
|
||||
| i | Int32 | YES | | VALUE |
|
||||
| i | Int32 | YES | | FIELD |
|
||||
| j | Int64 | NO | | TIME INDEX |
|
||||
+-------+-------+------+---------+---------------+
|
||||
|
||||
@@ -46,7 +46,7 @@ DESC TABLE new_table;
|
||||
+-------+-------+------+---------+---------------+
|
||||
| Field | Type | Null | Default | Semantic Type |
|
||||
+-------+-------+------+---------+---------------+
|
||||
| i | Int32 | YES | | VALUE |
|
||||
| i | Int32 | YES | | FIELD |
|
||||
| j | Int64 | NO | | TIME INDEX |
|
||||
+-------+-------+------+---------+---------------+
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ DESC TABLE test1;
|
||||
+-------+-------+------+---------+---------------+
|
||||
| Field | Type | Null | Default | Semantic Type |
|
||||
+-------+-------+------+---------+---------------+
|
||||
| i | Int32 | YES | | VALUE |
|
||||
| i | Int32 | YES | | FIELD |
|
||||
| j | Int64 | NO | | TIME INDEX |
|
||||
+-------+-------+------+---------+---------------+
|
||||
|
||||
@@ -68,7 +68,7 @@ DESC TABLE test2;
|
||||
+-------+-------+------+---------+---------------+
|
||||
| Field | Type | Null | Default | Semantic Type |
|
||||
+-------+-------+------+---------+---------------+
|
||||
| i | Int32 | YES | | VALUE |
|
||||
| i | Int32 | YES | | FIELD |
|
||||
| j | Int64 | NO | | TIME INDEX |
|
||||
+-------+-------+------+---------+---------------+
|
||||
|
||||
@@ -99,7 +99,7 @@ DESC TABLE test_pk;
|
||||
+-----------+---------+------+---------+---------------+
|
||||
| timestamp | Int64 | NO | | TIME INDEX |
|
||||
| host | String | YES | | PRIMARY KEY |
|
||||
| value | Float64 | YES | | VALUE |
|
||||
| value | Float64 | YES | | FIELD |
|
||||
+-----------+---------+------+---------+---------------+
|
||||
|
||||
DROP TABLE test_pk;
|
||||
|
||||
Reference in New Issue
Block a user