fix: do not accommodate fields for multi-value protocol (#6237)

This commit is contained in:
Yingwen
2025-06-04 09:10:52 +08:00
committed by GitHub
parent 69975f1f71
commit ee4f830be6
10 changed files with 100 additions and 23 deletions

View File

@@ -596,7 +596,7 @@ impl FrontendInvoker {
.start_timer();
self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor, false)
.handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
.await
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)

View File

@@ -76,7 +76,7 @@ impl GrpcQueryHandler for Instance {
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => {
self.handle_row_inserts(requests, ctx.clone(), false)
self.handle_row_inserts(requests, ctx.clone(), false, false)
.await?
}
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
@@ -420,6 +420,7 @@ impl Instance {
requests: RowInsertRequests,
ctx: QueryContextRef,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
self.inserter
.handle_row_inserts(
@@ -427,6 +428,7 @@ impl Instance {
ctx,
self.statement_executor.as_ref(),
accommodate_existing_schema,
is_single_value,
)
.await
.context(TableOperationSnafu)
@@ -439,7 +441,14 @@ impl Instance {
ctx: QueryContextRef,
) -> Result<Output> {
self.inserter
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref(), true)
.handle_last_non_null_inserts(
requests,
ctx,
self.statement_executor.as_ref(),
true,
// Influx protocol may writes multiple fields (values).
false,
)
.await
.context(TableOperationSnafu)
}

View File

@@ -52,8 +52,9 @@ impl OpentsdbProtocolHandler for Instance {
None
};
// OpenTSDB is single value.
let output = self
.handle_row_inserts(requests, ctx, true)
.handle_row_inserts(requests, ctx, true, true)
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)?;

View File

@@ -63,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
None
};
self.handle_row_inserts(requests, ctx, false)
self.handle_row_inserts(requests, ctx, false, false)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)

View File

@@ -195,7 +195,7 @@ impl PromStoreProtocolHandler for Instance {
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?
} else {
self.handle_row_inserts(request, ctx.clone(), true)
self.handle_row_inserts(request, ctx.clone(), true, true)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?

View File

@@ -233,7 +233,7 @@ impl SlowQueryEventHandler {
.into();
self.inserter
.handle_row_inserts(requests, query_ctx, &self.statement_executor, false)
.handle_row_inserts(requests, query_ctx, &self.statement_executor, false, false)
.await
.context(TableOperationSnafu)?;

View File

@@ -147,7 +147,7 @@ impl Inserter {
statement_executor: &StatementExecutor,
) -> Result<Output> {
let row_inserts = ColumnToRow::convert(requests)?;
self.handle_row_inserts(row_inserts, ctx, statement_executor, false)
self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
.await
}
@@ -158,6 +158,7 @@ impl Inserter {
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
preprocess_row_insert_requests(&mut requests.inserts)?;
self.handle_row_inserts_with_create_type(
@@ -166,6 +167,7 @@ impl Inserter {
statement_executor,
AutoCreateTableType::Physical,
accommodate_existing_schema,
is_single_value,
)
.await
}
@@ -183,6 +185,7 @@ impl Inserter {
statement_executor,
AutoCreateTableType::Log,
false,
false,
)
.await
}
@@ -199,6 +202,7 @@ impl Inserter {
statement_executor,
AutoCreateTableType::Trace,
false,
false,
)
.await
}
@@ -210,6 +214,7 @@ impl Inserter {
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
@@ -217,6 +222,7 @@ impl Inserter {
statement_executor,
AutoCreateTableType::LastNonNull,
accommodate_existing_schema,
is_single_value,
)
.await
}
@@ -229,6 +235,7 @@ impl Inserter {
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
// remove empty requests
requests.inserts.retain(|req| {
@@ -249,6 +256,7 @@ impl Inserter {
create_type,
statement_executor,
accommodate_existing_schema,
is_single_value,
)
.await?;
@@ -299,6 +307,7 @@ impl Inserter {
AutoCreateTableType::Logical(physical_table.to_string()),
statement_executor,
true,
true,
)
.await?;
let name_to_info = table_infos
@@ -464,9 +473,10 @@ impl Inserter {
/// This mapping is used in the conversion of RowToRegion.
///
/// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
/// It only works for TIME_INDEX and VALUE columns. This is for the case where the user creates a table with
/// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
/// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
/// remote write. This will modify the `RowInsertRequests` in place.
/// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
async fn create_or_alter_tables_on_demand(
&self,
requests: &mut RowInsertRequests,
@@ -474,6 +484,7 @@ impl Inserter {
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<CreateAlterTableResult> {
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
.with_label_values(&[auto_create_table_type.as_str()])
@@ -537,6 +548,7 @@ impl Inserter {
&table,
ctx,
accommodate_existing_schema,
is_single_value,
)? {
alter_tables.push(alter_expr);
}
@@ -815,12 +827,15 @@ impl Inserter {
/// When `accommodate_existing_schema` is true, it may modify the input `req` to
/// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
/// for more details.
/// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
/// input `req`.
fn get_alter_table_expr_on_demand(
&self,
req: &mut RowInsertRequest,
table: &TableRef,
ctx: &QueryContextRef,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Option<AlterTableExpr>> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
@@ -838,18 +853,20 @@ impl Inserter {
let table_schema = table.schema();
// Find timestamp column name
let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
// Find field column name if there is only one
// Find field column name if there is only one and `is_single_value` is true.
let mut field_col_name = None;
let mut multiple_field_cols = false;
table.field_columns().for_each(|col| {
if field_col_name.is_none() {
field_col_name = Some(col.name.clone());
} else {
multiple_field_cols = true;
if is_single_value {
let mut multiple_field_cols = false;
table.field_columns().for_each(|col| {
if field_col_name.is_none() {
field_col_name = Some(col.name.clone());
} else {
multiple_field_cols = true;
}
});
if multiple_field_cols {
field_col_name = None;
}
});
if multiple_field_cols {
field_col_name = None;
}
// Update column name in request schema for Timestamp/Field columns
@@ -875,11 +892,11 @@ impl Inserter {
}
}
// Remove from add_columns any column that is timestamp or field (if there is only one field column)
// Only keep columns that are tags or non-single field.
add_columns.add_columns.retain(|col| {
let def = col.column_def.as_ref().unwrap();
def.semantic_type != SemanticType::Timestamp as i32
&& (def.semantic_type != SemanticType::Field as i32 && field_col_name.is_some())
def.semantic_type == SemanticType::Tag as i32
|| (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
});
if add_columns.add_columns.is_empty() {
@@ -1231,7 +1248,7 @@ mod tests {
)),
);
let alter_expr = inserter
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true)
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
.unwrap();
assert!(alter_expr.is_none());

View File

@@ -236,6 +236,7 @@ impl PipelineTable {
Self::query_ctx(&table_info),
&self.statement_executor,
false,
false,
)
.await
.context(InsertPipelineSnafu)?;

View File

@@ -457,6 +457,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
))
.with_log_ingest_handler(instance.fe_instance().clone(), None, None)
.with_logs_handler(instance.fe_instance().clone())
.with_influxdb_handler(instance.fe_instance().clone())
.with_otlp_handler(instance.fe_instance().clone())
.with_jaeger_handler(instance.fe_instance().clone())
.with_greptime_config_options(instance.opts.to_toml().unwrap());

View File

@@ -117,6 +117,8 @@ macro_rules! http_tests {
test_log_query,
test_jaeger_query_api,
test_jaeger_query_api_for_trace_v1,
test_influxdb_write,
);
)*
};
@@ -4592,6 +4594,52 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_influxdb_write(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_influxdb_write").await;
let client = TestClient::new(app).await;
// Only write field cpu.
let result = client
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
.body("test_alter,host=host1 cpu=1.2 1664370459457010101")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
// Only write field mem.
let result = client
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
.body("test_alter,host=host1 mem=10240.0 1664370469457010101")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
// Write field cpu & mem.
let result = client
.post("/v1/influxdb/write?db=public&p=greptime&u=greptime")
.body("test_alter,host=host1 cpu=3.2,mem=20480.0 1664370479457010101")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
let expected = r#"[["host1",1.2,1664370459457010101,null],["host1",null,1664370469457010101,10240.0],["host1",3.2,1664370479457010101,20480.0]]"#;
validate_data(
"test_influxdb_write",
&client,
"select * from test_alter order by ts;",
expected,
)
.await;
guard.remove_all().await;
}
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())