mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
chore: decide tag column in log api follow table schema if table exists (#5138)
* chore: decide tag column in log api follow table schema if table exists * chore: add more test for greptime_identity pipeline * chore: change pipeline get_table function signature * chore: change identity_pipeline_inner tag_column_names type
This commit is contained in:
@@ -25,8 +25,9 @@ use servers::error::{
|
||||
};
|
||||
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
|
||||
use servers::query_handler::PipelineHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use session::context::{QueryContext, QueryContextRef};
|
||||
use snafu::ResultExt;
|
||||
use table::Table;
|
||||
|
||||
use crate::instance::Instance;
|
||||
|
||||
@@ -84,6 +85,18 @@ impl PipelineHandler for Instance {
|
||||
.await
|
||||
.context(PipelineSnafu)
|
||||
}
|
||||
|
||||
async fn get_table(
|
||||
&self,
|
||||
table: &str,
|
||||
query_ctx: &QueryContext,
|
||||
) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
|
||||
let catalog = query_ctx.current_catalog();
|
||||
let schema = query_ctx.current_schema();
|
||||
self.catalog_manager
|
||||
.table(catalog, &schema, table, None)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
pub mod coerce;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use ahash::HashMap;
|
||||
use api::helper::proto_value_type;
|
||||
@@ -367,20 +368,15 @@ fn json_value_to_row(
|
||||
Ok(Row { values: row })
|
||||
}
|
||||
|
||||
/// Identity pipeline for Greptime
|
||||
/// This pipeline will convert the input JSON array to Greptime Rows
|
||||
/// 1. The pipeline will add a default timestamp column to the schema
|
||||
/// 2. The pipeline not resolve NULL value
|
||||
/// 3. The pipeline assumes that the json format is fixed
|
||||
/// 4. The pipeline will return an error if the same column datatype is mismatched
|
||||
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
|
||||
pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
|
||||
fn identity_pipeline_inner<'a>(
|
||||
array: Vec<serde_json::Value>,
|
||||
tag_column_names: Option<impl Iterator<Item = &'a String>>,
|
||||
) -> Result<Rows> {
|
||||
let mut rows = Vec::with_capacity(array.len());
|
||||
|
||||
let mut schema = SchemaInfo::default();
|
||||
let mut schema_info = SchemaInfo::default();
|
||||
for value in array {
|
||||
if let serde_json::Value::Object(map) = value {
|
||||
let row = json_value_to_row(&mut schema, map)?;
|
||||
let row = json_value_to_row(&mut schema_info, map)?;
|
||||
rows.push(row);
|
||||
}
|
||||
}
|
||||
@@ -395,7 +391,7 @@ pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
|
||||
let ts = GreptimeValue {
|
||||
value_data: Some(ValueData::TimestampNanosecondValue(ns)),
|
||||
};
|
||||
let column_count = schema.schema.len();
|
||||
let column_count = schema_info.schema.len();
|
||||
for row in rows.iter_mut() {
|
||||
let diff = column_count - row.values.len();
|
||||
for _ in 0..diff {
|
||||
@@ -403,15 +399,49 @@ pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
|
||||
}
|
||||
row.values.push(ts.clone());
|
||||
}
|
||||
schema.schema.push(greptime_timestamp_schema);
|
||||
schema_info.schema.push(greptime_timestamp_schema);
|
||||
|
||||
// set the semantic type of the row key column to Tag
|
||||
if let Some(tag_column_names) = tag_column_names {
|
||||
tag_column_names.for_each(|tag_column_name| {
|
||||
if let Some(index) = schema_info.index.get(tag_column_name) {
|
||||
schema_info.schema[*index].semantic_type = SemanticType::Tag as i32;
|
||||
}
|
||||
});
|
||||
}
|
||||
Ok(Rows {
|
||||
schema: schema.schema,
|
||||
schema: schema_info.schema,
|
||||
rows,
|
||||
})
|
||||
}
|
||||
|
||||
/// Identity pipeline for Greptime
|
||||
/// This pipeline will convert the input JSON array to Greptime Rows
|
||||
/// params table is used to set the semantic type of the row key column to Tag
|
||||
/// 1. The pipeline will add a default timestamp column to the schema
|
||||
/// 2. The pipeline not resolve NULL value
|
||||
/// 3. The pipeline assumes that the json format is fixed
|
||||
/// 4. The pipeline will return an error if the same column datatype is mismatched
|
||||
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
|
||||
pub fn identity_pipeline(
|
||||
array: Vec<serde_json::Value>,
|
||||
table: Option<Arc<table::Table>>,
|
||||
) -> Result<Rows> {
|
||||
match table {
|
||||
Some(table) => {
|
||||
let table_info = table.table_info();
|
||||
let tag_column_names = table_info.meta.row_key_column_names();
|
||||
identity_pipeline_inner(array, Some(tag_column_names))
|
||||
}
|
||||
None => identity_pipeline_inner(array, None::<std::iter::Empty<&String>>),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::SemanticType;
|
||||
|
||||
use crate::etl::transform::transformer::greptime::identity_pipeline_inner;
|
||||
use crate::identity_pipeline;
|
||||
|
||||
#[test]
|
||||
@@ -437,7 +467,7 @@ mod tests {
|
||||
"gaga": "gaga"
|
||||
}),
|
||||
];
|
||||
let rows = identity_pipeline(array);
|
||||
let rows = identity_pipeline(array, None);
|
||||
assert!(rows.is_err());
|
||||
assert_eq!(
|
||||
rows.err().unwrap().to_string(),
|
||||
@@ -465,7 +495,7 @@ mod tests {
|
||||
"gaga": "gaga"
|
||||
}),
|
||||
];
|
||||
let rows = identity_pipeline(array);
|
||||
let rows = identity_pipeline(array, None);
|
||||
assert!(rows.is_err());
|
||||
assert_eq!(
|
||||
rows.err().unwrap().to_string(),
|
||||
@@ -493,7 +523,7 @@ mod tests {
|
||||
"gaga": "gaga"
|
||||
}),
|
||||
];
|
||||
let rows = identity_pipeline(array);
|
||||
let rows = identity_pipeline(array, None);
|
||||
assert!(rows.is_ok());
|
||||
let rows = rows.unwrap();
|
||||
assert_eq!(rows.schema.len(), 8);
|
||||
@@ -501,5 +531,58 @@ mod tests {
|
||||
assert_eq!(8, rows.rows[0].values.len());
|
||||
assert_eq!(8, rows.rows[1].values.len());
|
||||
}
|
||||
{
|
||||
let array = vec![
|
||||
serde_json::json!({
|
||||
"woshinull": null,
|
||||
"name": "Alice",
|
||||
"age": 20,
|
||||
"is_student": true,
|
||||
"score": 99.5,
|
||||
"hobbies": "reading",
|
||||
"address": "Beijing",
|
||||
}),
|
||||
serde_json::json!({
|
||||
"name": "Bob",
|
||||
"age": 21,
|
||||
"is_student": false,
|
||||
"score": 88.5,
|
||||
"hobbies": "swimming",
|
||||
"address": "Shanghai",
|
||||
"gaga": "gaga"
|
||||
}),
|
||||
];
|
||||
let tag_column_names = ["name".to_string(), "address".to_string()];
|
||||
let rows = identity_pipeline_inner(array, Some(tag_column_names.iter()));
|
||||
assert!(rows.is_ok());
|
||||
let rows = rows.unwrap();
|
||||
assert_eq!(rows.schema.len(), 8);
|
||||
assert_eq!(rows.rows.len(), 2);
|
||||
assert_eq!(8, rows.rows[0].values.len());
|
||||
assert_eq!(8, rows.rows[1].values.len());
|
||||
assert_eq!(
|
||||
rows.schema
|
||||
.iter()
|
||||
.find(|x| x.column_name == "name")
|
||||
.unwrap()
|
||||
.semantic_type,
|
||||
SemanticType::Tag as i32
|
||||
);
|
||||
assert_eq!(
|
||||
rows.schema
|
||||
.iter()
|
||||
.find(|x| x.column_name == "address")
|
||||
.unwrap()
|
||||
.semantic_type,
|
||||
SemanticType::Tag as i32
|
||||
);
|
||||
assert_eq!(
|
||||
rows.schema
|
||||
.iter()
|
||||
.filter(|x| x.semantic_type == SemanticType::Tag as i32)
|
||||
.count(),
|
||||
2
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,8 +46,8 @@ use session::context::{Channel, QueryContext, QueryContextRef};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{
|
||||
DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, ParseJsonSnafu,
|
||||
PipelineSnafu, Result, UnsupportedContentTypeSnafu,
|
||||
CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu,
|
||||
ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
|
||||
};
|
||||
use crate::http::extractor::LogTableName;
|
||||
use crate::http::header::CONTENT_TYPE_PROTOBUF_STR;
|
||||
@@ -612,10 +612,15 @@ async fn ingest_logs_inner(
|
||||
let mut results = Vec::with_capacity(pipeline_data.len());
|
||||
let transformed_data: Rows;
|
||||
if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
|
||||
let rows = pipeline::identity_pipeline(pipeline_data)
|
||||
let table = state
|
||||
.get_table(&table_name, &query_ctx)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
let rows = pipeline::identity_pipeline(pipeline_data, table)
|
||||
.context(PipelineTransformSnafu)
|
||||
.context(PipelineSnafu)?;
|
||||
transformed_data = rows;
|
||||
|
||||
transformed_data = rows
|
||||
} else {
|
||||
let pipeline = state
|
||||
.get_pipeline(&pipeline_name, version, query_ctx.clone())
|
||||
|
||||
@@ -39,7 +39,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequ
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion, PipelineWay};
|
||||
use serde_json::Value;
|
||||
use session::context::QueryContextRef;
|
||||
use session::context::{QueryContext, QueryContextRef};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::influxdb::InfluxdbRequest;
|
||||
@@ -164,4 +164,10 @@ pub trait PipelineHandler {
|
||||
version: PipelineVersion,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Option<()>>;
|
||||
|
||||
async fn get_table(
|
||||
&self,
|
||||
table: &str,
|
||||
query_ctx: &QueryContext,
|
||||
) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user