diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 59ed14eb25..6fb2ab938c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,6 +17,6 @@ repos: - id: fmt - id: clippy args: ["--workspace", "--all-targets", "--all-features", "--", "-D", "warnings"] - stages: [push] + stages: [pre-push] - id: cargo-check args: ["--workspace", "--all-targets", "--all-features"] diff --git a/Cargo.lock b/Cargo.lock index d5bfbf9530..d0f9464054 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6193,6 +6193,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "loki-api" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "674883a98273598ac3aad4301724c56734bea90574c5033af067e8f9fb5eb399" +dependencies = [ + "prost 0.12.6", + "prost-types 0.12.6", +] + [[package]] name = "lrlex" version = "0.13.7" @@ -10988,8 +10998,10 @@ dependencies = [ "hyper 0.14.30", "influxdb_line_protocol", "itertools 0.10.5", + "json5", "jsonb", "lazy_static", + "loki-api", "mime_guess", "mysql_async", "notify", @@ -12387,6 +12399,7 @@ dependencies = [ "futures-util", "hex", "itertools 0.10.5", + "loki-api", "meta-client", "meta-srv", "moka", diff --git a/src/cmd/src/cli/database.rs b/src/cmd/src/cli/database.rs index d313e93acf..a4eab99826 100644 --- a/src/cmd/src/cli/database.rs +++ b/src/cmd/src/cli/database.rs @@ -19,8 +19,8 @@ use base64::Engine; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use humantime::format_duration; use serde_json::Value; -use servers::http::greptime_result_v1::GreptimedbV1Response; use servers::http::header::constants::GREPTIME_DB_HEADER_TIMEOUT; +use servers::http::result::greptime_result_v1::GreptimedbV1Response; use servers::http::GreptimeQueryOutput; use snafu::ResultExt; diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 47bbfc4f73..bdf62d18c1 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -63,8 +63,10 @@ humantime-serde.workspace = true hyper = { version = "0.14", features = ["full"] } influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" } itertools.workspace = true +json5 = "0.4" jsonb.workspace = true lazy_static.workspace = true +loki-api = "0.1" mime_guess = "2.0" notify.workspace = true object-pool = "0.5" diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 4ac143be3d..3109e6e8c5 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::net::SocketAddr; use std::string::FromUtf8Error; +use axum::http::StatusCode as HttpStatusCode; use axum::response::{IntoResponse, Response}; use axum::{http, Json}; use base64::DecodeError; @@ -30,8 +31,6 @@ use query::parser::PromQuery; use serde_json::json; use snafu::{Location, Snafu}; -use crate::http::error_result::status_code_to_http_status; - #[derive(Snafu)] #[snafu(visibility(pub))] #[stack_trace_debug] @@ -499,6 +498,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to parse payload as json5"))] + ParseJson5 { + #[snafu(source)] + error: json5::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Unsupported content type: {:?}", content_type))] UnsupportedContentType { content_type: ContentType, @@ -631,6 +638,7 @@ impl ErrorExt for Error { | MissingQueryContext { .. } | MysqlValueConversion { .. } | ParseJson { .. } + | ParseJson5 { .. } | UnsupportedContentType { .. } | TimestampOverflow { .. } | OpenTelemetryLog { .. } @@ -719,3 +727,49 @@ impl IntoResponse for Error { (status, body).into_response() } } + +pub fn status_code_to_http_status(status_code: &StatusCode) -> HttpStatusCode { + match status_code { + StatusCode::Success | StatusCode::Cancelled => HttpStatusCode::OK, + + StatusCode::Unsupported + | StatusCode::InvalidArguments + | StatusCode::InvalidSyntax + | StatusCode::RequestOutdated + | StatusCode::RegionAlreadyExists + | StatusCode::TableColumnExists + | StatusCode::TableAlreadyExists + | StatusCode::RegionNotFound + | StatusCode::DatabaseNotFound + | StatusCode::TableNotFound + | StatusCode::TableColumnNotFound + | StatusCode::PlanQuery + | StatusCode::DatabaseAlreadyExists + | StatusCode::FlowNotFound + | StatusCode::FlowAlreadyExists => HttpStatusCode::BAD_REQUEST, + + StatusCode::AuthHeaderNotFound + | StatusCode::InvalidAuthHeader + | StatusCode::UserNotFound + | StatusCode::UnsupportedPasswordType + | StatusCode::UserPasswordMismatch + | StatusCode::RegionReadonly => HttpStatusCode::UNAUTHORIZED, + + StatusCode::PermissionDenied | StatusCode::AccessDenied => HttpStatusCode::FORBIDDEN, + + StatusCode::RateLimited => HttpStatusCode::TOO_MANY_REQUESTS, + + StatusCode::RegionNotReady + | StatusCode::TableUnavailable + | StatusCode::RegionBusy + | StatusCode::StorageUnavailable + | StatusCode::External => HttpStatusCode::SERVICE_UNAVAILABLE, + + StatusCode::Internal + | StatusCode::Unexpected + | StatusCode::IllegalState + | StatusCode::Unknown + | StatusCode::RuntimeResourcesExhausted + | StatusCode::EngineExecuteQuery => HttpStatusCode::INTERNAL_SERVER_ERROR, + } +} diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 34cff5de6f..c3cd977242 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -50,20 +50,20 @@ use tower_http::decompression::RequestDecompressionLayer; use tower_http::trace::TraceLayer; use self::authorize::AuthState; -use self::table_result::TableResponse; +use self::result::table_result::TableResponse; use crate::configurator::ConfiguratorRef; use crate::error::{AddressBindSnafu, AlreadyStartedSnafu, Error, HyperSnafu, Result, ToJsonSnafu}; -use crate::http::arrow_result::ArrowResponse; -use crate::http::csv_result::CsvResponse; -use crate::http::error_result::ErrorResponse; -use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; -use crate::http::influxdb_result_v1::InfluxdbV1Response; -use crate::http::json_result::JsonResponse; use crate::http::prometheus::{ build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query, range_query, series_query, }; +use crate::http::result::arrow_result::ArrowResponse; +use crate::http::result::csv_result::CsvResponse; +use crate::http::result::error_result::ErrorResponse; +use crate::http::result::greptime_result_v1::GreptimedbV1Response; +use crate::http::result::influxdb_result_v1::InfluxdbV1Response; +use crate::http::result::json_result::JsonResponse; use crate::interceptor::LogIngestInterceptorRef; use crate::metrics::http_metrics_layer; use crate::metrics_handler::MetricsHandler; @@ -76,8 +76,11 @@ use crate::query_handler::{ use crate::server::Server; pub mod authorize; +#[cfg(feature = "dashboard")] +mod dashboard; pub mod dyn_log; pub mod event; +mod extractor; pub mod handler; pub mod header; pub mod influxdb; @@ -87,19 +90,8 @@ pub mod otlp; pub mod pprof; pub mod prom_store; pub mod prometheus; -mod prometheus_resp; +pub mod result; pub mod script; - -pub mod arrow_result; -pub mod csv_result; -#[cfg(feature = "dashboard")] -mod dashboard; -pub mod error_result; -pub mod greptime_manage_resp; -pub mod greptime_result_v1; -pub mod influxdb_result_v1; -pub mod json_result; -pub mod table_result; mod timeout; pub(crate) use timeout::DynamicTimeoutLayer; @@ -621,17 +613,22 @@ impl HttpServerBuilder { validator: Option, ingest_interceptor: Option>, ) -> Self { - Self { - router: self.router.nest( - &format!("/{HTTP_API_VERSION}/events"), - HttpServer::route_log(LogState { - log_handler: handler, - log_validator: validator, - ingest_interceptor, - }), - ), - ..self - } + let log_state = LogState { + log_handler: handler, + log_validator: validator, + ingest_interceptor, + }; + let router = self.router.nest( + &format!("/{HTTP_API_VERSION}/events"), + HttpServer::route_log(log_state.clone()), + ); + + let router = router.nest( + &format!("/{HTTP_API_VERSION}/loki"), + HttpServer::route_loki(log_state), + ); + + Self { router, ..self } } pub fn with_plugins(self, plugins: Plugins) -> Self { @@ -768,6 +765,12 @@ impl HttpServer { .with_state(metrics_handler) } + fn route_loki(log_state: LogState) -> Router { + Router::new() + .route("/api/v1/push", routing::post(event::loki_ingest)) + .with_state(log_state) + } + fn route_log(log_state: LogState) -> Router { Router::new() .route("/logs", routing::post(event::log_ingester)) diff --git a/src/servers/src/http/authorize.rs b/src/servers/src/http/authorize.rs index 7578898c54..7aee9fa9b4 100644 --- a/src/servers/src/http/authorize.rs +++ b/src/servers/src/http/authorize.rs @@ -36,7 +36,7 @@ use crate::error::{ self, InvalidAuthHeaderInvisibleASCIISnafu, InvalidAuthHeaderSnafu, InvalidParameterSnafu, NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu, }; -use crate::http::error_result::ErrorResponse; +use crate::http::result::error_result::ErrorResponse; use crate::http::HTTP_API_PREFIX; use crate::influxdb::{is_influxdb_request, is_influxdb_v2_request}; diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 80f15ff211..7f227bdc6f 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -12,11 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{BTreeMap, HashMap}; use std::result::Result as StdResult; use std::sync::Arc; use std::time::Instant; -use api::v1::{RowInsertRequest, RowInsertRequests, Rows}; +use api::v1::value::ValueData; +use api::v1::{ + ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, + Value as GreptimeValue, +}; use axum::body::HttpBody; use axum::extract::{FromRequest, Multipart, Path, Query, State}; use axum::headers::ContentType; @@ -24,12 +29,17 @@ use axum::http::header::CONTENT_TYPE; use axum::http::{Request, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::{async_trait, BoxError, Extension, Json, TypedHeader}; +use bytes::Bytes; +use common_query::prelude::GREPTIME_TIMESTAMP; use common_query::{Output, OutputData}; use common_telemetry::{error, warn}; use datatypes::value::column_data_to_json; +use lazy_static::lazy_static; +use loki_api::prost_types::Timestamp; use pipeline::error::PipelineTransformSnafu; use pipeline::util::to_pipeline_version; use pipeline::PipelineVersion; +use prost::Message; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::{Deserializer, Map, Value}; @@ -37,22 +47,48 @@ use session::context::{Channel, QueryContext, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ - Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result, - UnsupportedContentTypeSnafu, + DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, ParseJsonSnafu, + PipelineSnafu, Result, UnsupportedContentTypeSnafu, }; -use crate::http::greptime_manage_resp::GreptimedbManageResponse; -use crate::http::greptime_result_v1::GreptimedbV1Response; +use crate::http::extractor::LogTableName; +use crate::http::header::CONTENT_TYPE_PROTOBUF_STR; +use crate::http::result::greptime_manage_resp::GreptimedbManageResponse; +use crate::http::result::greptime_result_v1::GreptimedbV1Response; use crate::http::HttpResponse; use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use crate::metrics::{ METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED, - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE, + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_LOKI_LOGS_INGESTION_COUNTER, + METRIC_LOKI_LOGS_INGESTION_ELAPSED, METRIC_SUCCESS_VALUE, }; +use crate::prom_store; use crate::query_handler::LogHandlerRef; const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_"; const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity"; +const LOKI_TABLE_NAME: &str = "loki_logs"; +const LOKI_LINE_COLUMN: &str = "line"; + +lazy_static! { + static ref LOKI_INIT_SCHEMAS: Vec = vec![ + ColumnSchema { + column_name: GREPTIME_TIMESTAMP.to_string(), + datatype: ColumnDataType::TimestampNanosecond.into(), + semantic_type: SemanticType::Timestamp.into(), + datatype_extension: None, + options: None, + }, + ColumnSchema { + column_name: LOKI_LINE_COLUMN.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + datatype_extension: None, + options: None, + }, + ]; +} + #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct LogIngesterQueryParams { pub table: Option, @@ -352,6 +388,146 @@ pub async fn pipeline_dryrun( Ok(Json(result).into_response()) } +#[axum_macros::debug_handler] +pub async fn loki_ingest( + State(log_state): State, + Extension(mut ctx): Extension, + TypedHeader(content_type): TypedHeader, + LogTableName(table_name): LogTableName, + bytes: Bytes, +) -> Result { + ctx.set_channel(Channel::Loki); + let ctx = Arc::new(ctx); + let db = ctx.get_db_string(); + let db_str = db.as_str(); + let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string()); + let exec_timer = Instant::now(); + + // decompress req + ensure!( + content_type.to_string() == CONTENT_TYPE_PROTOBUF_STR, + UnsupportedContentTypeSnafu { content_type } + ); + let decompressed = prom_store::snappy_decompress(&bytes).unwrap(); + let req = loki_api::logproto::PushRequest::decode(&decompressed[..]) + .context(DecodeOtlpRequestSnafu)?; + + // init schemas + let mut schemas = LOKI_INIT_SCHEMAS.clone(); + + let mut global_label_key_index: HashMap = HashMap::new(); + global_label_key_index.insert(GREPTIME_TIMESTAMP.to_string(), 0); + global_label_key_index.insert(LOKI_LINE_COLUMN.to_string(), 1); + + let mut rows = vec![]; + + for stream in req.streams { + // parse labels for each row + // encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145 + // use very dirty hack to parse labels + let labels = stream.labels.replace("=", ":"); + // use btreemap to keep order + let labels: BTreeMap = json5::from_str(&labels).context(ParseJson5Snafu)?; + + // process entries + for entry in stream.entries { + let ts = if let Some(ts) = entry.timestamp { + ts + } else { + continue; + }; + let line = entry.line; + + // create and init row + let mut row = Vec::with_capacity(schemas.capacity()); + for _ in 0..row.capacity() { + row.push(GreptimeValue { value_data: None }); + } + // insert ts and line + row[0] = GreptimeValue { + value_data: Some(ValueData::TimestampNanosecondValue(prost_ts_to_nano(&ts))), + }; + row[1] = GreptimeValue { + value_data: Some(ValueData::StringValue(line)), + }; + // insert labels + for (k, v) in labels.iter() { + if let Some(index) = global_label_key_index.get(k) { + // exist in schema + // insert value using index + row[*index as usize] = GreptimeValue { + value_data: Some(ValueData::StringValue(v.clone())), + }; + } else { + // not exist + // add schema and append to values + schemas.push(ColumnSchema { + column_name: k.clone(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Tag.into(), + datatype_extension: None, + options: None, + }); + global_label_key_index.insert(k.clone(), (schemas.len() - 1) as i32); + + row.push(GreptimeValue { + value_data: Some(ValueData::StringValue(v.clone())), + }); + } + } + + rows.push(row); + } + } + + // fill Null for missing values + for row in rows.iter_mut() { + if row.len() < schemas.len() { + for _ in row.len()..schemas.len() { + row.push(GreptimeValue { value_data: None }); + } + } + } + + let rows = Rows { + rows: rows.into_iter().map(|values| Row { values }).collect(), + schema: schemas, + }; + + let ins_req = RowInsertRequest { + table_name, + rows: Some(rows), + }; + let ins_reqs = RowInsertRequests { + inserts: vec![ins_req], + }; + + let handler = log_state.log_handler; + let output = handler.insert_logs(ins_reqs, ctx).await; + + if let Ok(Output { + data: OutputData::AffectedRows(rows), + meta: _, + }) = &output + { + METRIC_LOKI_LOGS_INGESTION_COUNTER + .with_label_values(&[db_str]) + .inc_by(*rows as u64); + METRIC_LOKI_LOGS_INGESTION_ELAPSED + .with_label_values(&[db_str, METRIC_SUCCESS_VALUE]) + .observe(exec_timer.elapsed().as_secs_f64()); + } else { + METRIC_LOKI_LOGS_INGESTION_ELAPSED + .with_label_values(&[db_str, METRIC_FAILURE_VALUE]) + .observe(exec_timer.elapsed().as_secs_f64()); + } + + let response = GreptimedbV1Response::from_output(vec![output]) + .await + .with_execution_time(exec_timer.elapsed().as_millis() as u64); + Ok(response) +} + #[axum_macros::debug_handler] pub async fn log_ingester( State(log_state): State, @@ -531,6 +707,11 @@ pub struct LogState { pub ingest_interceptor: Option>, } +#[inline] +fn prost_ts_to_nano(ts: &Timestamp) -> i64 { + ts.seconds * 1_000_000_000 + ts.nanos as i64 +} + #[cfg(test)] mod tests { use super::*; @@ -565,4 +746,16 @@ mod tests { .to_string(); assert_eq!(a, "[{\"a\":1},{\"b\":2}]"); } + + #[test] + fn test_ts_to_nano() { + // ts = 1731748568804293888 + // seconds = 1731748568 + // nano = 804293888 + let ts = Timestamp { + seconds: 1731748568, + nanos: 804293888, + }; + assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888); + } } diff --git a/src/servers/src/http/extractor.rs b/src/servers/src/http/extractor.rs new file mode 100644 index 0000000000..3a9bddcd8f --- /dev/null +++ b/src/servers/src/http/extractor.rs @@ -0,0 +1,147 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::str; +use std::result::Result as StdResult; + +use axum::async_trait; +use axum::extract::FromRequestParts; +use axum::http::request::Parts; +use axum::http::StatusCode; +use http::HeaderMap; +use pipeline::SelectInfo; + +use crate::http::header::constants::{ + GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, + GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME, + GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, +}; + +/// Axum extractor for optional target log table name from HTTP header +/// using [`GREPTIME_LOG_TABLE_NAME_HEADER_NAME`] as key. +pub struct LogTableName(pub Option); + +#[async_trait] +impl FromRequestParts for LogTableName +where + S: Send + Sync, +{ + type Rejection = (StatusCode, String); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { + let headers = &parts.headers; + string_value_from_header(headers, GREPTIME_LOG_TABLE_NAME_HEADER_NAME).map(LogTableName) + } +} + +/// Axum extractor for optional target trace table name from HTTP header +/// using [`GREPTIME_TRACE_TABLE_NAME_HEADER_NAME`] as key. +pub struct TraceTableName(pub Option); + +#[async_trait] +impl FromRequestParts for TraceTableName +where + S: Send + Sync, +{ + type Rejection = (StatusCode, String); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { + let headers = &parts.headers; + string_value_from_header(headers, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME).map(TraceTableName) + } +} + +/// Axum extractor for select keys from HTTP header, +/// to extract and uplift key-values from OTLP attributes. +/// See [`SelectInfo`] for more details. +pub struct SelectInfoWrapper(pub SelectInfo); + +#[async_trait] +impl FromRequestParts for SelectInfoWrapper +where + S: Send + Sync, +{ + type Rejection = (StatusCode, String); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { + let select = + string_value_from_header(&parts.headers, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?; + + match select { + Some(name) => { + if name.is_empty() { + Ok(SelectInfoWrapper(Default::default())) + } else { + Ok(SelectInfoWrapper(SelectInfo::from(name))) + } + } + None => Ok(SelectInfoWrapper(Default::default())), + } + } +} + +/// Axum extractor for optional Pipeline name and version +/// from HTTP headers. +pub struct PipelineInfo { + pub pipeline_name: Option, + pub pipeline_version: Option, +} + +#[async_trait] +impl FromRequestParts for PipelineInfo +where + S: Send + Sync, +{ + type Rejection = (StatusCode, String); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { + let headers = &parts.headers; + let pipeline_name = + string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME)?; + let pipeline_version = + string_value_from_header(headers, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME)?; + match (pipeline_name, pipeline_version) { + (Some(name), Some(version)) => Ok(PipelineInfo { + pipeline_name: Some(name), + pipeline_version: Some(version), + }), + (None, _) => Ok(PipelineInfo { + pipeline_name: None, + pipeline_version: None, + }), + (Some(name), None) => Ok(PipelineInfo { + pipeline_name: Some(name), + pipeline_version: None, + }), + } + } +} + +#[inline] +fn string_value_from_header( + headers: &HeaderMap, + header_key: &str, +) -> StdResult, (StatusCode, String)> { + headers + .get(header_key) + .map(|value| { + String::from_utf8(value.as_bytes().to_vec()).map_err(|_| { + ( + StatusCode::BAD_REQUEST, + format!("`{}` header is not valid UTF-8 string type.", header_key), + ) + }) + }) + .transpose() +} diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 4925c79639..c275940148 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -34,13 +34,13 @@ use serde_json::Value; use session::context::{Channel, QueryContext, QueryContextRef}; use super::header::collect_plan_metrics; -use crate::http::arrow_result::ArrowResponse; -use crate::http::csv_result::CsvResponse; -use crate::http::error_result::ErrorResponse; -use crate::http::greptime_result_v1::GreptimedbV1Response; -use crate::http::influxdb_result_v1::InfluxdbV1Response; -use crate::http::json_result::JsonResponse; -use crate::http::table_result::TableResponse; +use crate::http::result::arrow_result::ArrowResponse; +use crate::http::result::csv_result::CsvResponse; +use crate::http::result::error_result::ErrorResponse; +use crate::http::result::greptime_result_v1::GreptimedbV1Response; +use crate::http::result::influxdb_result_v1::InfluxdbV1Response; +use crate::http::result::json_result::JsonResponse; +use crate::http::result::table_result::TableResponse; use crate::http::{ ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput, HttpResponse, ResponseFormat, diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index bf5b0a4ebc..51a07ca01f 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -67,7 +67,8 @@ pub static GREPTIME_DB_HEADER_NAME: HeaderName = pub static GREPTIME_TIMEZONE_HEADER_NAME: HeaderName = HeaderName::from_static(constants::GREPTIME_TIMEZONE_HEADER_NAME); -pub static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf"); +pub static CONTENT_TYPE_PROTOBUF_STR: &str = "application/x-protobuf"; +pub static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static(CONTENT_TYPE_PROTOBUF_STR); pub static CONTENT_ENCODING_SNAPPY: HeaderValue = HeaderValue::from_static("snappy"); pub struct GreptimeDbName(Option); diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 5059afd972..b5c4607c29 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -12,19 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::str; -use std::result::Result as StdResult; use std::sync::Arc; -use axum::extract::{FromRequestParts, State}; -use axum::http::header::HeaderValue; -use axum::http::request::Parts; -use axum::http::{header, StatusCode}; +use axum::extract::State; +use axum::http::header; use axum::response::IntoResponse; -use axum::{async_trait, Extension}; +use axum::Extension; use bytes::Bytes; use common_telemetry::tracing; -use http::HeaderMap; use opentelemetry_proto::tonic::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, }; @@ -35,19 +30,14 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, }; use pipeline::util::to_pipeline_version; -use pipeline::{PipelineWay, SelectInfo}; +use pipeline::PipelineWay; use prost::Message; use session::context::{Channel, QueryContext}; use snafu::prelude::*; -use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME; use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; -use crate::error::{self, InvalidUtf8ValueSnafu, PipelineSnafu, Result}; -use crate::http::header::constants::{ - GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, - GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, -}; -use crate::otlp::logs::LOG_TABLE_NAME; +use crate::error::{self, PipelineSnafu, Result}; +use crate::http::extractor::{LogTableName, PipelineInfo, SelectInfoWrapper, TraceTableName}; use crate::otlp::trace::TRACE_TABLE_NAME; use crate::query_handler::OpenTelemetryProtocolHandlerRef; @@ -83,18 +73,13 @@ pub async fn metrics( #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))] pub async fn traces( State(handler): State, - header: HeaderMap, + TraceTableName(table_name): TraceTableName, Extension(mut query_ctx): Extension, bytes: Bytes, ) -> Result> { let db = query_ctx.get_db_string(); - let table_name = extract_string_value_from_header( - &header, - GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, - Some(TRACE_TABLE_NAME), - )? - // safety here, we provide default value for table_name - .unwrap(); + let table_name = table_name.unwrap_or_else(|| TRACE_TABLE_NAME.to_string()); + query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED @@ -113,139 +98,17 @@ pub async fn traces( }) } -pub struct PipelineInfo { - pub pipeline_name: Option, - pub pipeline_version: Option, -} - -fn parse_header_value_to_string(header: &HeaderValue) -> Result { - String::from_utf8(header.as_bytes().to_vec()).context(InvalidUtf8ValueSnafu) -} - -fn extract_string_value_from_header( - headers: &HeaderMap, - header: &str, - default_table_name: Option<&str>, -) -> Result> { - let table_name = headers.get(header); - match table_name { - Some(name) => parse_header_value_to_string(name).map(Some), - None => match default_table_name { - Some(name) => Ok(Some(name.to_string())), - None => Ok(None), - }, - } -} - -fn utf8_error(header_name: &str) -> impl Fn(error::Error) -> (StatusCode, String) + use<'_> { - move |_| { - ( - StatusCode::BAD_REQUEST, - format!("`{}` header is not valid UTF-8 string type.", header_name), - ) - } -} - -#[async_trait] -impl FromRequestParts for PipelineInfo -where - S: Send + Sync, -{ - type Rejection = (StatusCode, String); - - async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let headers = &parts.headers; - let pipeline_name = - extract_string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, None) - .map_err(utf8_error(GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME))?; - let pipeline_version = extract_string_value_from_header( - headers, - GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, - None, - ) - .map_err(utf8_error(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME))?; - match (pipeline_name, pipeline_version) { - (Some(name), Some(version)) => Ok(PipelineInfo { - pipeline_name: Some(name), - pipeline_version: Some(version), - }), - (None, _) => Ok(PipelineInfo { - pipeline_name: None, - pipeline_version: None, - }), - (Some(name), None) => Ok(PipelineInfo { - pipeline_name: Some(name), - pipeline_version: None, - }), - } - } -} - -pub struct TableInfo { - table_name: String, -} - -#[async_trait] -impl FromRequestParts for TableInfo -where - S: Send + Sync, -{ - type Rejection = (StatusCode, String); - - async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let table_name = extract_string_value_from_header( - &parts.headers, - GREPTIME_LOG_TABLE_NAME_HEADER_NAME, - Some(LOG_TABLE_NAME), - ) - .map_err(utf8_error(GREPTIME_LOG_TABLE_NAME_HEADER_NAME))? - // safety here, we provide default value for table_name - .unwrap(); - - Ok(TableInfo { table_name }) - } -} - -pub struct SelectInfoWrapper(SelectInfo); - -#[async_trait] -impl FromRequestParts for SelectInfoWrapper -where - S: Send + Sync, -{ - type Rejection = (StatusCode, String); - - async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let select = extract_string_value_from_header( - &parts.headers, - GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, - None, - ) - .map_err(utf8_error(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME))?; - - match select { - Some(name) => { - if name.is_empty() { - Ok(SelectInfoWrapper(Default::default())) - } else { - Ok(SelectInfoWrapper(SelectInfo::from(name))) - } - } - None => Ok(SelectInfoWrapper(Default::default())), - } - } -} - #[axum_macros::debug_handler] #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "logs"))] pub async fn logs( State(handler): State, Extension(mut query_ctx): Extension, pipeline_info: PipelineInfo, - table_info: TableInfo, + LogTableName(tablename): LogTableName, SelectInfoWrapper(select_info): SelectInfoWrapper, bytes: Bytes, ) -> Result> { + let tablename = tablename.unwrap_or_else(|| "opentelemetry_logs".to_string()); let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); @@ -272,7 +135,7 @@ pub async fn logs( }; handler - .logs(request, pipeline_way, table_info.table_name, query_ctx) + .logs(request, pipeline_way, tablename, query_ctx) .await .map(|o| OtlpResponse { resp_body: ExportLogsServiceResponse { diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 6f749f2595..684845e69a 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -45,7 +45,7 @@ use serde_json::Value; use session::context::QueryContext; use snafu::{Location, OptionExt, ResultExt}; -pub use super::prometheus_resp::PrometheusJsonResponse; +pub use super::result::prometheus_resp::PrometheusJsonResponse; use crate::error::{ CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, Result, TableNotFoundSnafu, UnexpectedResultSnafu, diff --git a/src/servers/src/http/result.rs b/src/servers/src/http/result.rs new file mode 100644 index 0000000000..8fd4df8576 --- /dev/null +++ b/src/servers/src/http/result.rs @@ -0,0 +1,23 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod arrow_result; +pub(crate) mod csv_result; +pub mod error_result; +pub(crate) mod greptime_manage_resp; +pub mod greptime_result_v1; +pub mod influxdb_result_v1; +pub(crate) mod json_result; +pub(crate) mod prometheus_resp; +pub(crate) mod table_result; diff --git a/src/servers/src/http/arrow_result.rs b/src/servers/src/http/result/arrow_result.rs similarity index 99% rename from src/servers/src/http/arrow_result.rs rename to src/servers/src/http/result/arrow_result.rs index 6a739fee04..e6e9abe6d3 100644 --- a/src/servers/src/http/arrow_result.rs +++ b/src/servers/src/http/result/arrow_result.rs @@ -29,8 +29,8 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use crate::error::{self, Error}; -use crate::http::error_result::ErrorResponse; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; +use crate::http::result::error_result::ErrorResponse; use crate::http::{HttpResponse, ResponseFormat}; #[derive(Serialize, Deserialize, Debug, JsonSchema)] diff --git a/src/servers/src/http/csv_result.rs b/src/servers/src/http/result/csv_result.rs similarity index 95% rename from src/servers/src/http/csv_result.rs rename to src/servers/src/http/result/csv_result.rs index d6b512653b..4f78eb8da7 100644 --- a/src/servers/src/http/csv_result.rs +++ b/src/servers/src/http/result/csv_result.rs @@ -23,10 +23,10 @@ use mime_guess::mime; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use super::process_with_limit; -use crate::http::error_result::ErrorResponse; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; -use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; +// use super::process_with_limit; +use crate::http::result::error_result::ErrorResponse; +use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat}; #[derive(Serialize, Deserialize, Debug, JsonSchema)] pub struct CsvResponse { diff --git a/src/servers/src/http/error_result.rs b/src/servers/src/http/result/error_result.rs similarity index 60% rename from src/servers/src/http/error_result.rs rename to src/servers/src/http/result/error_result.rs index 40766d2cbf..bbc488e562 100644 --- a/src/servers/src/http/error_result.rs +++ b/src/servers/src/http/result/error_result.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use axum::http::{HeaderValue, StatusCode as HttpStatusCode}; +use axum::http::HeaderValue; use axum::response::{IntoResponse, Response}; use axum::Json; use common_error::ext::ErrorExt; @@ -21,6 +21,7 @@ use common_telemetry::{debug, error}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::error::status_code_to_http_status; use crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE; use crate::http::header::GREPTIME_DB_HEADER_EXECUTION_TIME; @@ -87,49 +88,3 @@ impl IntoResponse for ErrorResponse { (status_code, resp).into_response() } } - -pub fn status_code_to_http_status(status_code: &StatusCode) -> HttpStatusCode { - match status_code { - StatusCode::Success | StatusCode::Cancelled => HttpStatusCode::OK, - - StatusCode::Unsupported - | StatusCode::InvalidArguments - | StatusCode::InvalidSyntax - | StatusCode::RequestOutdated - | StatusCode::RegionAlreadyExists - | StatusCode::TableColumnExists - | StatusCode::TableAlreadyExists - | StatusCode::RegionNotFound - | StatusCode::DatabaseNotFound - | StatusCode::TableNotFound - | StatusCode::TableColumnNotFound - | StatusCode::PlanQuery - | StatusCode::DatabaseAlreadyExists - | StatusCode::FlowNotFound - | StatusCode::FlowAlreadyExists => HttpStatusCode::BAD_REQUEST, - - StatusCode::AuthHeaderNotFound - | StatusCode::InvalidAuthHeader - | StatusCode::UserNotFound - | StatusCode::UnsupportedPasswordType - | StatusCode::UserPasswordMismatch - | StatusCode::RegionReadonly => HttpStatusCode::UNAUTHORIZED, - - StatusCode::PermissionDenied | StatusCode::AccessDenied => HttpStatusCode::FORBIDDEN, - - StatusCode::RateLimited => HttpStatusCode::TOO_MANY_REQUESTS, - - StatusCode::RegionNotReady - | StatusCode::TableUnavailable - | StatusCode::RegionBusy - | StatusCode::StorageUnavailable - | StatusCode::External => HttpStatusCode::SERVICE_UNAVAILABLE, - - StatusCode::Internal - | StatusCode::Unexpected - | StatusCode::IllegalState - | StatusCode::Unknown - | StatusCode::RuntimeResourcesExhausted - | StatusCode::EngineExecuteQuery => HttpStatusCode::INTERNAL_SERVER_ERROR, - } -} diff --git a/src/servers/src/http/greptime_manage_resp.rs b/src/servers/src/http/result/greptime_manage_resp.rs similarity index 100% rename from src/servers/src/http/greptime_manage_resp.rs rename to src/servers/src/http/result/greptime_manage_resp.rs diff --git a/src/servers/src/http/greptime_result_v1.rs b/src/servers/src/http/result/greptime_result_v1.rs similarity index 92% rename from src/servers/src/http/greptime_result_v1.rs rename to src/servers/src/http/result/greptime_result_v1.rs index de8284c40a..ba90b6b329 100644 --- a/src/servers/src/http/greptime_result_v1.rs +++ b/src/servers/src/http/result/greptime_result_v1.rs @@ -22,10 +22,10 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; -use super::header::GREPTIME_DB_HEADER_METRICS; -use super::process_with_limit; -use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; -use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; +use crate::http::header::{ + GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT, GREPTIME_DB_HEADER_METRICS, +}; +use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat}; #[derive(Serialize, Deserialize, Debug, JsonSchema)] pub struct GreptimedbV1Response { diff --git a/src/servers/src/http/influxdb_result_v1.rs b/src/servers/src/http/result/influxdb_result_v1.rs similarity index 99% rename from src/servers/src/http/influxdb_result_v1.rs rename to src/servers/src/http/result/influxdb_result_v1.rs index 3fa4594728..ae65b8fa0a 100644 --- a/src/servers/src/http/influxdb_result_v1.rs +++ b/src/servers/src/http/result/influxdb_result_v1.rs @@ -23,8 +23,8 @@ use serde_json::Value; use snafu::ResultExt; use crate::error::{Error, ToJsonSnafu}; -use crate::http::error_result::ErrorResponse; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; +use crate::http::result::error_result::ErrorResponse; use crate::http::{Epoch, HttpResponse, ResponseFormat}; #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] diff --git a/src/servers/src/http/json_result.rs b/src/servers/src/http/result/json_result.rs similarity index 96% rename from src/servers/src/http/json_result.rs rename to src/servers/src/http/result/json_result.rs index bf4e4d7770..71546c570d 100644 --- a/src/servers/src/http/json_result.rs +++ b/src/servers/src/http/result/json_result.rs @@ -21,10 +21,9 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::{json, Map, Value}; -use super::process_with_limit; -use crate::http::error_result::ErrorResponse; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; -use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; +use crate::http::result::error_result::ErrorResponse; +use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat}; /// The json format here is different from the default json output of `GreptimedbV1` result. /// `JsonResponse` is intended to make it easier for user to consume data. diff --git a/src/servers/src/http/prometheus_resp.rs b/src/servers/src/http/result/prometheus_resp.rs similarity index 98% rename from src/servers/src/http/prometheus_resp.rs rename to src/servers/src/http/result/prometheus_resp.rs index 3ec4552696..84497c1533 100644 --- a/src/servers/src/http/prometheus_resp.rs +++ b/src/servers/src/http/result/prometheus_resp.rs @@ -32,12 +32,13 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use snafu::{OptionExt, ResultExt}; -use super::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS}; -use super::prometheus::{ +use crate::error::{ + status_code_to_http_status, CollectRecordbatchSnafu, Result, UnexpectedResultSnafu, +}; +use crate::http::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS}; +use crate::http::prometheus::{ PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusResponse, }; -use crate::error::{CollectRecordbatchSnafu, Result, UnexpectedResultSnafu}; -use crate::http::error_result::status_code_to_http_status; #[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] pub struct PrometheusJsonResponse { diff --git a/src/servers/src/http/table_result.rs b/src/servers/src/http/result/table_result.rs similarity index 97% rename from src/servers/src/http/table_result.rs rename to src/servers/src/http/result/table_result.rs index dacef51bea..9a69b08c0a 100644 --- a/src/servers/src/http/table_result.rs +++ b/src/servers/src/http/result/table_result.rs @@ -24,10 +24,9 @@ use mime_guess::mime; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use super::process_with_limit; -use crate::http::error_result::ErrorResponse; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; -use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; +use crate::http::result::error_result::ErrorResponse; +use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat}; #[derive(Serialize, Deserialize, Debug, JsonSchema)] pub struct TableResponse { diff --git a/src/servers/src/http/script.rs b/src/servers/src/http/script.rs index cb8ca7fc2b..278e54457a 100644 --- a/src/servers/src/http/script.rs +++ b/src/servers/src/http/script.rs @@ -26,7 +26,7 @@ use session::context::QueryContext; use snafu::ResultExt; use crate::error::{HyperSnafu, InvalidUtf8ValueSnafu}; -use crate::http::error_result::ErrorResponse; +use crate::http::result::error_result::ErrorResponse; use crate::http::{ApiState, GreptimedbV1Response, HttpResponse}; macro_rules! json_err { diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index ead86f3ad8..87ab38dc82 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -161,6 +161,19 @@ lazy_static! { &[METRIC_DB_LABEL, METRIC_RESULT_LABEL] ) .unwrap(); + pub static ref METRIC_LOKI_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!( + "greptime_servers_loki_logs_ingestion_counter", + "servers loki logs ingestion counter", + &[METRIC_DB_LABEL] + ) + .unwrap(); + pub static ref METRIC_LOKI_LOGS_INGESTION_ELAPSED: HistogramVec = + register_histogram_vec!( + "greptime_servers_loki_logs_ingestion_elapsed", + "servers loki logs ingestion elapsed", + &[METRIC_DB_LABEL, METRIC_RESULT_LABEL] + ) + .unwrap(); pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_http_logs_transform_elapsed", diff --git a/src/session/src/context.rs b/src/session/src/context.rs index cab351176b..4e681253c1 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -372,6 +372,7 @@ pub enum Channel { Grpc = 6, Influx = 7, Opentsdb = 8, + Loki = 9, } impl From for Channel { @@ -385,6 +386,7 @@ impl From for Channel { 6 => Self::Grpc, 7 => Self::Influx, 8 => Self::Opentsdb, + 9 => Self::Loki, _ => Self::Unknown, } @@ -412,6 +414,7 @@ impl Display for Channel { Channel::Grpc => write!(f, "grpc"), Channel::Influx => write!(f, "influx"), Channel::Opentsdb => write!(f, "opentsdb"), + Channel::Loki => write!(f, "loki"), Channel::Unknown => write!(f, "unknown"), } } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index dba8688eea..3fd12f1494 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -45,6 +45,7 @@ flow.workspace = true frontend = { workspace = true, features = ["testing"] } futures.workspace = true futures-util.workspace = true +loki-api = "0.1" meta-client.workspace = true meta-srv = { workspace = true, features = ["mock"] } moka.workspace = true diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index a0cc3b2f5d..c0c3d84a7a 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -14,6 +14,7 @@ use std::collections::BTreeMap; use std::io::Write; +use std::str::FromStr; use api::prom_store::remote::WriteRequest; use auth::user_provider_from_option; @@ -21,18 +22,21 @@ use axum::http::{HeaderName, HeaderValue, StatusCode}; use common_error::status_code::StatusCode as ErrorCode; use flate2::write::GzEncoder; use flate2::Compression; +use loki_api::logproto::{EntryAdapter, PushRequest, StreamAdapter}; +use loki_api::prost_types::Timestamp; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics; use prost::Message; use serde_json::{json, Value}; -use servers::http::error_result::ErrorResponse; -use servers::http::greptime_result_v1::GreptimedbV1Response; use servers::http::handler::HealthResponse; +use servers::http::header::constants::GREPTIME_LOG_TABLE_NAME_HEADER_NAME; use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME}; -use servers::http::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response}; use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse}; +use servers::http::result::error_result::ErrorResponse; +use servers::http::result::greptime_result_v1::GreptimedbV1Response; +use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response}; use servers::http::test_helpers::{TestClient, TestResponse}; use servers::http::GreptimeQueryOutput; use servers::prom_store; @@ -92,6 +96,7 @@ macro_rules! http_tests { test_otlp_metrics, test_otlp_traces, test_otlp_logs, + test_loki_logs, ); )* }; @@ -1690,6 +1695,69 @@ pub async fn test_otlp_logs(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_loki_logs(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_loke_logs").await; + + let client = TestClient::new(app); + + // init loki request + let req: PushRequest = PushRequest { + streams: vec![StreamAdapter { + labels: "{service=\"test\",source=\"integration\"}".to_string(), + entries: vec![EntryAdapter { + timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()), + line: "this is a log message".to_string(), + }], + hash: rand::random(), + }], + }; + let encode = req.encode_to_vec(); + let body = prom_store::snappy_compress(&encode).unwrap(); + + // write to loki + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static(GREPTIME_LOG_TABLE_NAME_HEADER_NAME), + HeaderValue::from_static("loki_table_name"), + ), + ], + "/v1/loki/api/v1/push", + body, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + // test schema + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + validate_data( + "loki_schema", + &client, + "show create table loki_table_name;", + expected, + ) + .await; + + // test content + let expected = r#"[[1730976830000000000,"this is a log message","test","integration"]]"#; + validate_data( + "loki_content", + &client, + "select * from loki_table_name;", + expected, + ) + .await; + + guard.remove_all().await; +} + async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) { let res = client .get(format!("/v1/sql?sql={sql}").as_str())