feat(log-query): try infer and cast type for literal value (#6712)

* initial impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* one more test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove duplicated test cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove duplicated methods

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* initial impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* one more test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove duplicated test cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove duplicated methods

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: add eq for log query

* skip for both literals

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: paomian <xpaomian@gmail.com>
This commit is contained in:
Ruihang Xia
2025-08-12 23:28:37 -07:00
committed by GitHub
parent f0bec4940f
commit ccccaf7734
2 changed files with 419 additions and 36 deletions

View File

@@ -328,6 +328,42 @@ pub struct ColumnFilters {
pub filters: Vec<ContentFilter>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EqualValue {
/// Exact match with a string value.
String(String),
/// Exact match with a boolean value.
Boolean(bool),
/// Exact match with a number value.
Int(i64),
/// Exact match with a float value.
Float(f64),
}
impl From<String> for EqualValue {
fn from(value: String) -> Self {
EqualValue::String(value)
}
}
impl From<bool> for EqualValue {
fn from(value: bool) -> Self {
EqualValue::Boolean(value)
}
}
impl From<i64> for EqualValue {
fn from(value: i64) -> Self {
EqualValue::Int(value)
}
}
impl From<f64> for EqualValue {
fn from(value: f64) -> Self {
EqualValue::Float(value)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ContentFilter {
// Search-based filters
@@ -366,6 +402,7 @@ pub enum ContentFilter {
In(Vec<String>),
IsTrue,
IsFalse,
Equal(EqualValue),
// Compound filters
Compound(Vec<ContentFilter>, ConjunctionOperator),

View File

@@ -12,17 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow_schema::Schema as ArrowSchema;
use arrow_schema::{DataType, Schema as ArrowSchema};
use catalog::table_source::DfTableSourceProvider;
use common_function::utils::escape_like_pattern;
use datafusion::datasource::DefaultTableSource;
use datafusion::execution::SessionState;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::utils::{conjunction, disjunction};
use datafusion_expr::{col, lit, not, BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator};
use datafusion_expr::{
col, lit, not, BinaryExpr, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion_sql::TableReference;
use datatypes::schema::Schema;
use log_query::{BinaryOperator, LogExpr, LogQuery, TimeFilter};
use log_query::{BinaryOperator, EqualValue, LogExpr, LogQuery, TimeFilter};
use snafu::{OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;
@@ -193,7 +195,7 @@ impl LogQueryPlanner {
.filters
.iter()
.filter_map(|filter| {
self.build_content_filter_with_expr(col_expr.clone(), filter)
self.build_content_filter_with_expr(col_expr.clone(), filter, &df_schema)
.transpose()
})
.try_collect::<Vec<_>>()?;
@@ -212,6 +214,7 @@ impl LogQueryPlanner {
&self,
col_expr: Expr,
filter: &log_query::ContentFilter,
schema: &DFSchema,
) -> Result<Option<Expr>> {
match filter {
log_query::ContentFilter::Exact(value) => Ok(Some(
@@ -237,55 +240,56 @@ impl LogQueryPlanner {
start_inclusive,
end_inclusive,
} => {
let start_literal = self.create_inferred_literal(start, &col_expr, schema);
let end_literal = self.create_inferred_literal(end, &col_expr, schema);
let left = if *start_inclusive {
col_expr
.clone()
.gt_eq(lit(ScalarValue::Utf8(Some(start.clone()))))
col_expr.clone().gt_eq(start_literal)
} else {
col_expr
.clone()
.gt(lit(ScalarValue::Utf8(Some(start.clone()))))
col_expr.clone().gt(start_literal)
};
let right = if *end_inclusive {
col_expr.lt_eq(lit(ScalarValue::Utf8(Some(end.clone()))))
col_expr.lt_eq(end_literal)
} else {
col_expr.lt(lit(ScalarValue::Utf8(Some(end.clone()))))
col_expr.lt(end_literal)
};
Ok(Some(left.and(right)))
}
log_query::ContentFilter::GreatThan { value, inclusive } => {
let value_literal = self.create_inferred_literal(value, &col_expr, schema);
let comparison_expr = if *inclusive {
col_expr.gt_eq(lit(ScalarValue::Utf8(Some(value.clone()))))
col_expr.gt_eq(value_literal)
} else {
col_expr.gt(lit(ScalarValue::Utf8(Some(value.clone()))))
col_expr.gt(value_literal)
};
Ok(Some(comparison_expr))
}
log_query::ContentFilter::LessThan { value, inclusive } => {
let value_literal = self.create_inferred_literal(value, &col_expr, schema);
if *inclusive {
Ok(Some(
col_expr.lt_eq(lit(ScalarValue::Utf8(Some(value.clone())))),
))
Ok(Some(col_expr.lt_eq(value_literal)))
} else {
Ok(Some(
col_expr.lt(lit(ScalarValue::Utf8(Some(value.clone())))),
))
Ok(Some(col_expr.lt(value_literal)))
}
}
log_query::ContentFilter::In(values) => {
let values: Vec<_> = values
let inferred_values: Vec<_> = values
.iter()
.map(|v| lit(ScalarValue::Utf8(Some(v.clone()))))
.map(|v| self.create_inferred_literal(v, &col_expr, schema))
.collect();
Ok(Some(col_expr.in_list(values, false)))
Ok(Some(col_expr.in_list(inferred_values, false)))
}
log_query::ContentFilter::IsTrue => Ok(Some(col_expr.is_true())),
log_query::ContentFilter::IsFalse => Ok(Some(col_expr.is_false())),
log_query::ContentFilter::Equal(value) => {
let value_literal = Self::create_eq_literal(value.clone());
Ok(Some(col_expr.eq(value_literal)))
}
log_query::ContentFilter::Compound(filters, op) => {
let exprs = filters
.iter()
.filter_map(|filter| {
self.build_content_filter_with_expr(col_expr.clone(), filter)
self.build_content_filter_with_expr(col_expr.clone(), filter, schema)
.transpose()
})
.try_collect::<Vec<_>>()?;
@@ -339,15 +343,8 @@ impl LogQueryPlanner {
LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())),
LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))),
LogExpr::BinaryOp { left, op, right } => {
let left_expr = self.log_expr_to_df_expr(left, schema)?;
let right_expr = self.log_expr_to_df_expr(right, schema)?;
let df_op = Self::binary_operator_to_df_operator(op);
Ok(Expr::BinaryExpr(BinaryExpr {
left: Box::new(left_expr),
op: df_op,
right: Box::new(right_expr),
}))
// For binary operations, always use type inference (matches original behavior)
self.build_binary_expr(left, op, right, schema)
}
LogExpr::ScalarFunc { name, args, alias } => {
self.build_scalar_func(schema, name, args, alias)
@@ -410,6 +407,77 @@ impl LogQueryPlanner {
}
}
/// Parse a string literal to the appropriate ScalarValue based on target DataType.
/// Falls back to UTF8 if parsing fails or type is not supported.
fn infer_literal_scalar_value(&self, literal: &str, target_type: &DataType) -> ScalarValue {
let utf8_literal = ScalarValue::Utf8(Some(literal.to_string()));
utf8_literal.cast_to(target_type).unwrap_or(utf8_literal)
}
/// Build binary expression with type inference for literals.
/// Attempts to infer literal types from the non-literal operand's type.
fn build_binary_expr(
&self,
left: &LogExpr,
op: &BinaryOperator,
right: &LogExpr,
schema: &DFSchema,
) -> Result<Expr> {
// Convert both sides to DataFusion expressions first
let mut left_expr = self.log_expr_to_df_expr(left, schema)?;
let mut right_expr = self.log_expr_to_df_expr(right, schema)?;
// Try to infer literal types based on the other operand
match (left, right) {
(LogExpr::Literal(_), LogExpr::Literal(_)) => {
// both are literal, do nothing
}
(LogExpr::Literal(literal), _) => {
// Left is literal, try to infer from right
if let Ok(right_type) = right_expr.get_type(schema) {
let inferred_scalar = self.infer_literal_scalar_value(literal, &right_type);
left_expr = lit(inferred_scalar);
}
}
(_, LogExpr::Literal(literal)) => {
// Right is literal, try to infer from left
if let Ok(left_type) = left_expr.get_type(schema) {
let inferred_scalar = self.infer_literal_scalar_value(literal, &left_type);
right_expr = lit(inferred_scalar);
}
}
_ => {
// Neither is a simple literal, no type inference needed
}
}
let df_op = Self::binary_operator_to_df_operator(op);
Ok(Expr::BinaryExpr(BinaryExpr {
left: Box::new(left_expr),
op: df_op,
right: Box::new(right_expr),
}))
}
/// Create a type-inferred literal based on the provided expression's type.
/// Falls back to UTF8 if type inference fails.
fn create_inferred_literal(&self, value: &str, expr: &Expr, schema: &DFSchema) -> Expr {
if let Ok(expr_type) = expr.get_type(schema) {
lit(self.infer_literal_scalar_value(value, &expr_type))
} else {
lit(ScalarValue::Utf8(Some(value.to_string())))
}
}
fn create_eq_literal(value: EqualValue) -> Expr {
match value {
EqualValue::String(s) => lit(ScalarValue::Utf8(Some(s))),
EqualValue::Float(n) => lit(ScalarValue::Float64(Some(n))),
EqualValue::Int(n) => lit(ScalarValue::Int64(Some(n))),
EqualValue::Boolean(b) => lit(ScalarValue::Boolean(Some(b))),
}
}
/// Process LogExpr recursively.
///
/// Return the [`LogicalPlanBuilder`] after modification and the resulting expression's names.
@@ -532,15 +600,69 @@ mod tests {
Arc::new(Schema::new(columns))
}
fn mock_schema_with_typed_columns() -> SchemaRef {
let columns = vec![
ColumnSchema::new(
"message".to_string(),
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"timestamp".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(
"host".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"is_active".to_string(),
ConcreteDataType::boolean_datatype(),
true,
),
// Add more typed columns for comprehensive testing
ColumnSchema::new("age".to_string(), ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"score".to_string(),
ConcreteDataType::float64_datatype(),
true,
),
ColumnSchema::new(
"count".to_string(),
ConcreteDataType::uint64_datatype(),
true,
),
];
Arc::new(Schema::new(columns))
}
/// Registers table under `greptime`, with `message`, `timestamp`, `host`, and `is_active` columns.
async fn build_test_table_provider(
table_name_tuples: &[(String, String)],
) -> DfTableSourceProvider {
build_test_table_provider_with_schema(table_name_tuples, mock_schema()).await
}
/// Registers table under `greptime`, with typed columns for type inference tests.
async fn build_test_table_provider_with_typed_columns(
table_name_tuples: &[(String, String)],
) -> DfTableSourceProvider {
build_test_table_provider_with_schema(table_name_tuples, mock_schema_with_typed_columns())
.await
}
async fn build_test_table_provider_with_schema(
table_name_tuples: &[(String, String)],
schema: SchemaRef,
) -> DfTableSourceProvider {
let catalog_list = MemoryCatalogManager::with_default_setup();
for (schema_name, table_name) in table_name_tuples {
let schema = mock_schema();
let table_meta = TableMetaBuilder::empty()
.schema(schema)
.schema(schema.clone())
.primary_key_indices(vec![2])
.value_indices(vec![0])
.next_column_id(1024)
@@ -1198,8 +1320,232 @@ mod tests {
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
let expected_expr_string = "character_length(message) > Utf8(\"100\") IS TRUE";
let expected_expr_string = "character_length(message) > Int32(100) IS TRUE";
assert_eq!(format!("{}", expr), expected_expr_string);
}
#[tokio::test]
async fn test_type_inference_float_comparison() {
let table_provider = build_test_table_provider_with_typed_columns(&[(
"public".to_string(),
"test_table".to_string(),
)])
.await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema_with_typed_columns();
// Test Between with float column and string literals
let column_filter = ColumnFilters {
expr: Box::new(LogExpr::NamedIdent("score".to_string())),
filters: vec![ContentFilter::Between {
start: "75.5".to_string(),
end: "100.0".to_string(),
start_inclusive: true,
end_inclusive: false,
}],
};
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
// Should infer literals as Float64 since score is a float64 column
let expected_expr = col("score")
.gt_eq(lit(ScalarValue::Float64(Some(75.5))))
.and(col("score").lt(lit(ScalarValue::Float64(Some(100.0)))));
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}
#[tokio::test]
async fn test_type_inference_boolean_comparison() {
let table_provider = build_test_table_provider_with_typed_columns(&[(
"public".to_string(),
"test_table".to_string(),
)])
.await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema_with_typed_columns();
// Test In filter with boolean column and string literals
let column_filter = ColumnFilters {
expr: Box::new(LogExpr::NamedIdent("is_active".to_string())),
filters: vec![ContentFilter::In(vec![
"true".to_string(),
"1".to_string(),
"false".to_string(),
])],
};
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
// Should infer string literals as boolean values
let expected_expr = col("is_active").in_list(
vec![
lit(ScalarValue::Boolean(Some(true))),
lit(ScalarValue::Boolean(Some(true))),
lit(ScalarValue::Boolean(Some(false))),
],
false,
);
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}
#[tokio::test]
async fn test_fallback_to_utf8_on_parse_failure() {
let table_provider = build_test_table_provider_with_typed_columns(&[(
"public".to_string(),
"test_table".to_string(),
)])
.await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema_with_typed_columns();
// Test with invalid number format - should fallback to UTF8
let column_filter = ColumnFilters {
expr: Box::new(LogExpr::NamedIdent("age".to_string())),
filters: vec![ContentFilter::GreatThan {
value: "not_a_number".to_string(),
inclusive: false,
}],
};
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
// Should fallback to UTF8 since "not_a_number" can't be parsed as int32
let expected_expr = col("age").gt(lit(ScalarValue::Utf8(Some("not_a_number".to_string()))));
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}
#[tokio::test]
async fn test_string_column_remains_utf8() {
let table_provider = build_test_table_provider_with_typed_columns(&[(
"public".to_string(),
"test_table".to_string(),
)])
.await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema_with_typed_columns();
// Test with string column - should remain UTF8 even if value looks like a number
let column_filter = ColumnFilters {
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![ContentFilter::GreatThan {
value: "123".to_string(),
inclusive: false,
}],
};
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
// Should remain UTF8 since message is a string column
let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("123".to_string()))));
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}
#[tokio::test]
async fn test_all_binary_operators() {
let table_provider = build_test_table_provider_with_typed_columns(&[(
"public".to_string(),
"test_table".to_string(),
)])
.await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema_with_typed_columns();
let df_schema = DFSchema::try_from(schema.arrow_schema().clone()).unwrap();
// Test all comparison operators
let test_cases = vec![
(BinaryOperator::Eq, Operator::Eq),
(BinaryOperator::Ne, Operator::NotEq),
(BinaryOperator::Lt, Operator::Lt),
(BinaryOperator::Le, Operator::LtEq),
(BinaryOperator::Gt, Operator::Gt),
(BinaryOperator::Ge, Operator::GtEq),
(BinaryOperator::Plus, Operator::Plus),
(BinaryOperator::Minus, Operator::Minus),
(BinaryOperator::Multiply, Operator::Multiply),
(BinaryOperator::Divide, Operator::Divide),
(BinaryOperator::Modulo, Operator::Modulo),
(BinaryOperator::And, Operator::And),
(BinaryOperator::Or, Operator::Or),
];
for (binary_op, expected_df_op) in test_cases {
let binary_expr = LogExpr::BinaryOp {
left: Box::new(LogExpr::NamedIdent("age".to_string())),
op: binary_op,
right: Box::new(LogExpr::Literal("25".to_string())),
};
let expr = planner
.log_expr_to_df_expr(&binary_expr, &df_schema)
.unwrap();
let expected_expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(col("age")),
op: expected_df_op,
right: Box::new(lit(ScalarValue::Int32(Some(25)))),
});
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}
}
#[tokio::test]
async fn test_nested_binary_operations() {
let table_provider = build_test_table_provider_with_typed_columns(&[(
"public".to_string(),
"test_table".to_string(),
)])
.await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema_with_typed_columns();
let df_schema = DFSchema::try_from(schema.arrow_schema().clone()).unwrap();
// Test nested binary operations: (age + 5) > 30
let nested_binary_expr = LogExpr::BinaryOp {
left: Box::new(LogExpr::BinaryOp {
left: Box::new(LogExpr::NamedIdent("age".to_string())),
op: BinaryOperator::Plus,
right: Box::new(LogExpr::Literal("5".to_string())),
}),
op: BinaryOperator::Gt,
right: Box::new(LogExpr::Literal("30".to_string())),
};
let expr = planner
.log_expr_to_df_expr(&nested_binary_expr, &df_schema)
.unwrap();
// Verify the nested structure is properly created
let expected_expr_debug = "BinaryExpr(BinaryExpr { left: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: \"age\" }), op: Plus, right: Literal(Int32(5)) }), op: Gt, right: Literal(Int32(30)) })";
assert_eq!(format!("{:?}", expr), expected_expr_debug);
}
}