feat: Loki remote write (#4941)

* chore: add debug loki remote write url

* chore: add decode snappy

* chore: format output

* feature: impl loki remote write

* fix: special labels deserialize

* chore: move result to folder

* chore: finish todo in loki write

* test: loki write

* chore: fix cr issue

* chore: fix cr issue

* chore: fix cr issue

* chore: update pre-commit config

* chore: fix cr issue

Co-authored-by: dennis zhuang <killme2008@gmail.com>

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
shuiyisong
2024-11-18 16:39:17 +08:00
committed by GitHub
parent 2b72e66536
commit c199604ece
28 changed files with 607 additions and 269 deletions

View File

@@ -17,6 +17,6 @@ repos:
- id: fmt
- id: clippy
args: ["--workspace", "--all-targets", "--all-features", "--", "-D", "warnings"]
stages: [push]
stages: [pre-push]
- id: cargo-check
args: ["--workspace", "--all-targets", "--all-features"]

13
Cargo.lock generated
View File

@@ -6193,6 +6193,16 @@ dependencies = [
"uuid",
]
[[package]]
name = "loki-api"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "674883a98273598ac3aad4301724c56734bea90574c5033af067e8f9fb5eb399"
dependencies = [
"prost 0.12.6",
"prost-types 0.12.6",
]
[[package]]
name = "lrlex"
version = "0.13.7"
@@ -10988,8 +10998,10 @@ dependencies = [
"hyper 0.14.30",
"influxdb_line_protocol",
"itertools 0.10.5",
"json5",
"jsonb",
"lazy_static",
"loki-api",
"mime_guess",
"mysql_async",
"notify",
@@ -12387,6 +12399,7 @@ dependencies = [
"futures-util",
"hex",
"itertools 0.10.5",
"loki-api",
"meta-client",
"meta-srv",
"moka",

View File

@@ -19,8 +19,8 @@ use base64::Engine;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use humantime::format_duration;
use serde_json::Value;
use servers::http::greptime_result_v1::GreptimedbV1Response;
use servers::http::header::constants::GREPTIME_DB_HEADER_TIMEOUT;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::GreptimeQueryOutput;
use snafu::ResultExt;

View File

@@ -63,8 +63,10 @@ humantime-serde.workspace = true
hyper = { version = "0.14", features = ["full"] }
influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" }
itertools.workspace = true
json5 = "0.4"
jsonb.workspace = true
lazy_static.workspace = true
loki-api = "0.1"
mime_guess = "2.0"
notify.workspace = true
object-pool = "0.5"

View File

@@ -16,6 +16,7 @@ use std::any::Any;
use std::net::SocketAddr;
use std::string::FromUtf8Error;
use axum::http::StatusCode as HttpStatusCode;
use axum::response::{IntoResponse, Response};
use axum::{http, Json};
use base64::DecodeError;
@@ -30,8 +31,6 @@ use query::parser::PromQuery;
use serde_json::json;
use snafu::{Location, Snafu};
use crate::http::error_result::status_code_to_http_status;
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
@@ -499,6 +498,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse payload as json5"))]
ParseJson5 {
#[snafu(source)]
error: json5::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported content type: {:?}", content_type))]
UnsupportedContentType {
content_type: ContentType,
@@ -631,6 +638,7 @@ impl ErrorExt for Error {
| MissingQueryContext { .. }
| MysqlValueConversion { .. }
| ParseJson { .. }
| ParseJson5 { .. }
| UnsupportedContentType { .. }
| TimestampOverflow { .. }
| OpenTelemetryLog { .. }
@@ -719,3 +727,49 @@ impl IntoResponse for Error {
(status, body).into_response()
}
}
pub fn status_code_to_http_status(status_code: &StatusCode) -> HttpStatusCode {
match status_code {
StatusCode::Success | StatusCode::Cancelled => HttpStatusCode::OK,
StatusCode::Unsupported
| StatusCode::InvalidArguments
| StatusCode::InvalidSyntax
| StatusCode::RequestOutdated
| StatusCode::RegionAlreadyExists
| StatusCode::TableColumnExists
| StatusCode::TableAlreadyExists
| StatusCode::RegionNotFound
| StatusCode::DatabaseNotFound
| StatusCode::TableNotFound
| StatusCode::TableColumnNotFound
| StatusCode::PlanQuery
| StatusCode::DatabaseAlreadyExists
| StatusCode::FlowNotFound
| StatusCode::FlowAlreadyExists => HttpStatusCode::BAD_REQUEST,
StatusCode::AuthHeaderNotFound
| StatusCode::InvalidAuthHeader
| StatusCode::UserNotFound
| StatusCode::UnsupportedPasswordType
| StatusCode::UserPasswordMismatch
| StatusCode::RegionReadonly => HttpStatusCode::UNAUTHORIZED,
StatusCode::PermissionDenied | StatusCode::AccessDenied => HttpStatusCode::FORBIDDEN,
StatusCode::RateLimited => HttpStatusCode::TOO_MANY_REQUESTS,
StatusCode::RegionNotReady
| StatusCode::TableUnavailable
| StatusCode::RegionBusy
| StatusCode::StorageUnavailable
| StatusCode::External => HttpStatusCode::SERVICE_UNAVAILABLE,
StatusCode::Internal
| StatusCode::Unexpected
| StatusCode::IllegalState
| StatusCode::Unknown
| StatusCode::RuntimeResourcesExhausted
| StatusCode::EngineExecuteQuery => HttpStatusCode::INTERNAL_SERVER_ERROR,
}
}

View File

@@ -50,20 +50,20 @@ use tower_http::decompression::RequestDecompressionLayer;
use tower_http::trace::TraceLayer;
use self::authorize::AuthState;
use self::table_result::TableResponse;
use self::result::table_result::TableResponse;
use crate::configurator::ConfiguratorRef;
use crate::error::{AddressBindSnafu, AlreadyStartedSnafu, Error, HyperSnafu, Result, ToJsonSnafu};
use crate::http::arrow_result::ArrowResponse;
use crate::http::csv_result::CsvResponse;
use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::json_result::JsonResponse;
use crate::http::prometheus::{
build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query,
range_query, series_query,
};
use crate::http::result::arrow_result::ArrowResponse;
use crate::http::result::csv_result::CsvResponse;
use crate::http::result::error_result::ErrorResponse;
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
use crate::http::result::json_result::JsonResponse;
use crate::interceptor::LogIngestInterceptorRef;
use crate::metrics::http_metrics_layer;
use crate::metrics_handler::MetricsHandler;
@@ -76,8 +76,11 @@ use crate::query_handler::{
use crate::server::Server;
pub mod authorize;
#[cfg(feature = "dashboard")]
mod dashboard;
pub mod dyn_log;
pub mod event;
mod extractor;
pub mod handler;
pub mod header;
pub mod influxdb;
@@ -87,19 +90,8 @@ pub mod otlp;
pub mod pprof;
pub mod prom_store;
pub mod prometheus;
mod prometheus_resp;
pub mod result;
pub mod script;
pub mod arrow_result;
pub mod csv_result;
#[cfg(feature = "dashboard")]
mod dashboard;
pub mod error_result;
pub mod greptime_manage_resp;
pub mod greptime_result_v1;
pub mod influxdb_result_v1;
pub mod json_result;
pub mod table_result;
mod timeout;
pub(crate) use timeout::DynamicTimeoutLayer;
@@ -621,17 +613,22 @@ impl HttpServerBuilder {
validator: Option<LogValidatorRef>,
ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
) -> Self {
Self {
router: self.router.nest(
&format!("/{HTTP_API_VERSION}/events"),
HttpServer::route_log(LogState {
log_handler: handler,
log_validator: validator,
ingest_interceptor,
}),
),
..self
}
let log_state = LogState {
log_handler: handler,
log_validator: validator,
ingest_interceptor,
};
let router = self.router.nest(
&format!("/{HTTP_API_VERSION}/events"),
HttpServer::route_log(log_state.clone()),
);
let router = router.nest(
&format!("/{HTTP_API_VERSION}/loki"),
HttpServer::route_loki(log_state),
);
Self { router, ..self }
}
pub fn with_plugins(self, plugins: Plugins) -> Self {
@@ -768,6 +765,12 @@ impl HttpServer {
.with_state(metrics_handler)
}
fn route_loki<S>(log_state: LogState) -> Router<S> {
Router::new()
.route("/api/v1/push", routing::post(event::loki_ingest))
.with_state(log_state)
}
fn route_log<S>(log_state: LogState) -> Router<S> {
Router::new()
.route("/logs", routing::post(event::log_ingester))

View File

@@ -36,7 +36,7 @@ use crate::error::{
self, InvalidAuthHeaderInvisibleASCIISnafu, InvalidAuthHeaderSnafu, InvalidParameterSnafu,
NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu,
};
use crate::http::error_result::ErrorResponse;
use crate::http::result::error_result::ErrorResponse;
use crate::http::HTTP_API_PREFIX;
use crate::influxdb::{is_influxdb_request, is_influxdb_v2_request};

View File

@@ -12,11 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use std::result::Result as StdResult;
use std::sync::Arc;
use std::time::Instant;
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType,
Value as GreptimeValue,
};
use axum::body::HttpBody;
use axum::extract::{FromRequest, Multipart, Path, Query, State};
use axum::headers::ContentType;
@@ -24,12 +29,17 @@ use axum::http::header::CONTENT_TYPE;
use axum::http::{Request, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::{async_trait, BoxError, Extension, Json, TypedHeader};
use bytes::Bytes;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::{Output, OutputData};
use common_telemetry::{error, warn};
use datatypes::value::column_data_to_json;
use lazy_static::lazy_static;
use loki_api::prost_types::Timestamp;
use pipeline::error::PipelineTransformSnafu;
use pipeline::util::to_pipeline_version;
use pipeline::PipelineVersion;
use prost::Message;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{Deserializer, Map, Value};
@@ -37,22 +47,48 @@ use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{
Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result,
UnsupportedContentTypeSnafu,
DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, ParseJsonSnafu,
PipelineSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::greptime_manage_resp::GreptimedbManageResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::extractor::LogTableName;
use crate::http::header::CONTENT_TYPE_PROTOBUF_STR;
use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
use crate::http::HttpResponse;
use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use crate::metrics::{
METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_LOKI_LOGS_INGESTION_COUNTER,
METRIC_LOKI_LOGS_INGESTION_ELAPSED, METRIC_SUCCESS_VALUE,
};
use crate::prom_store;
use crate::query_handler::LogHandlerRef;
const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
const LOKI_TABLE_NAME: &str = "loki_logs";
const LOKI_LINE_COLUMN: &str = "line";
lazy_static! {
static ref LOKI_INIT_SCHEMAS: Vec<ColumnSchema> = vec![
ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: LOKI_LINE_COLUMN.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
datatype_extension: None,
options: None,
},
];
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct LogIngesterQueryParams {
pub table: Option<String>,
@@ -352,6 +388,146 @@ pub async fn pipeline_dryrun(
Ok(Json(result).into_response())
}
#[axum_macros::debug_handler]
pub async fn loki_ingest(
State(log_state): State<LogState>,
Extension(mut ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
LogTableName(table_name): LogTableName,
bytes: Bytes,
) -> Result<HttpResponse> {
ctx.set_channel(Channel::Loki);
let ctx = Arc::new(ctx);
let db = ctx.get_db_string();
let db_str = db.as_str();
let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string());
let exec_timer = Instant::now();
// decompress req
ensure!(
content_type.to_string() == CONTENT_TYPE_PROTOBUF_STR,
UnsupportedContentTypeSnafu { content_type }
);
let decompressed = prom_store::snappy_decompress(&bytes).unwrap();
let req = loki_api::logproto::PushRequest::decode(&decompressed[..])
.context(DecodeOtlpRequestSnafu)?;
// init schemas
let mut schemas = LOKI_INIT_SCHEMAS.clone();
let mut global_label_key_index: HashMap<String, i32> = HashMap::new();
global_label_key_index.insert(GREPTIME_TIMESTAMP.to_string(), 0);
global_label_key_index.insert(LOKI_LINE_COLUMN.to_string(), 1);
let mut rows = vec![];
for stream in req.streams {
// parse labels for each row
// encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145
// use very dirty hack to parse labels
let labels = stream.labels.replace("=", ":");
// use btreemap to keep order
let labels: BTreeMap<String, String> = json5::from_str(&labels).context(ParseJson5Snafu)?;
// process entries
for entry in stream.entries {
let ts = if let Some(ts) = entry.timestamp {
ts
} else {
continue;
};
let line = entry.line;
// create and init row
let mut row = Vec::with_capacity(schemas.capacity());
for _ in 0..row.capacity() {
row.push(GreptimeValue { value_data: None });
}
// insert ts and line
row[0] = GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(prost_ts_to_nano(&ts))),
};
row[1] = GreptimeValue {
value_data: Some(ValueData::StringValue(line)),
};
// insert labels
for (k, v) in labels.iter() {
if let Some(index) = global_label_key_index.get(k) {
// exist in schema
// insert value using index
row[*index as usize] = GreptimeValue {
value_data: Some(ValueData::StringValue(v.clone())),
};
} else {
// not exist
// add schema and append to values
schemas.push(ColumnSchema {
column_name: k.clone(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
datatype_extension: None,
options: None,
});
global_label_key_index.insert(k.clone(), (schemas.len() - 1) as i32);
row.push(GreptimeValue {
value_data: Some(ValueData::StringValue(v.clone())),
});
}
}
rows.push(row);
}
}
// fill Null for missing values
for row in rows.iter_mut() {
if row.len() < schemas.len() {
for _ in row.len()..schemas.len() {
row.push(GreptimeValue { value_data: None });
}
}
}
let rows = Rows {
rows: rows.into_iter().map(|values| Row { values }).collect(),
schema: schemas,
};
let ins_req = RowInsertRequest {
table_name,
rows: Some(rows),
};
let ins_reqs = RowInsertRequests {
inserts: vec![ins_req],
};
let handler = log_state.log_handler;
let output = handler.insert_logs(ins_reqs, ctx).await;
if let Ok(Output {
data: OutputData::AffectedRows(rows),
meta: _,
}) = &output
{
METRIC_LOKI_LOGS_INGESTION_COUNTER
.with_label_values(&[db_str])
.inc_by(*rows as u64);
METRIC_LOKI_LOGS_INGESTION_ELAPSED
.with_label_values(&[db_str, METRIC_SUCCESS_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
} else {
METRIC_LOKI_LOGS_INGESTION_ELAPSED
.with_label_values(&[db_str, METRIC_FAILURE_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
}
let response = GreptimedbV1Response::from_output(vec![output])
.await
.with_execution_time(exec_timer.elapsed().as_millis() as u64);
Ok(response)
}
#[axum_macros::debug_handler]
pub async fn log_ingester(
State(log_state): State<LogState>,
@@ -531,6 +707,11 @@ pub struct LogState {
pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
}
#[inline]
fn prost_ts_to_nano(ts: &Timestamp) -> i64 {
ts.seconds * 1_000_000_000 + ts.nanos as i64
}
#[cfg(test)]
mod tests {
use super::*;
@@ -565,4 +746,16 @@ mod tests {
.to_string();
assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
}
#[test]
fn test_ts_to_nano() {
// ts = 1731748568804293888
// seconds = 1731748568
// nano = 804293888
let ts = Timestamp {
seconds: 1731748568,
nanos: 804293888,
};
assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888);
}
}

View File

@@ -0,0 +1,147 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use core::str;
use std::result::Result as StdResult;
use axum::async_trait;
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use axum::http::StatusCode;
use http::HeaderMap;
use pipeline::SelectInfo;
use crate::http::header::constants::{
GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
};
/// Axum extractor for optional target log table name from HTTP header
/// using [`GREPTIME_LOG_TABLE_NAME_HEADER_NAME`] as key.
pub struct LogTableName(pub Option<String>);
#[async_trait]
impl<S> FromRequestParts<S> for LogTableName
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let headers = &parts.headers;
string_value_from_header(headers, GREPTIME_LOG_TABLE_NAME_HEADER_NAME).map(LogTableName)
}
}
/// Axum extractor for optional target trace table name from HTTP header
/// using [`GREPTIME_TRACE_TABLE_NAME_HEADER_NAME`] as key.
pub struct TraceTableName(pub Option<String>);
#[async_trait]
impl<S> FromRequestParts<S> for TraceTableName
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let headers = &parts.headers;
string_value_from_header(headers, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME).map(TraceTableName)
}
}
/// Axum extractor for select keys from HTTP header,
/// to extract and uplift key-values from OTLP attributes.
/// See [`SelectInfo`] for more details.
pub struct SelectInfoWrapper(pub SelectInfo);
#[async_trait]
impl<S> FromRequestParts<S> for SelectInfoWrapper
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let select =
string_value_from_header(&parts.headers, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?;
match select {
Some(name) => {
if name.is_empty() {
Ok(SelectInfoWrapper(Default::default()))
} else {
Ok(SelectInfoWrapper(SelectInfo::from(name)))
}
}
None => Ok(SelectInfoWrapper(Default::default())),
}
}
}
/// Axum extractor for optional Pipeline name and version
/// from HTTP headers.
pub struct PipelineInfo {
pub pipeline_name: Option<String>,
pub pipeline_version: Option<String>,
}
#[async_trait]
impl<S> FromRequestParts<S> for PipelineInfo
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let headers = &parts.headers;
let pipeline_name =
string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME)?;
let pipeline_version =
string_value_from_header(headers, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME)?;
match (pipeline_name, pipeline_version) {
(Some(name), Some(version)) => Ok(PipelineInfo {
pipeline_name: Some(name),
pipeline_version: Some(version),
}),
(None, _) => Ok(PipelineInfo {
pipeline_name: None,
pipeline_version: None,
}),
(Some(name), None) => Ok(PipelineInfo {
pipeline_name: Some(name),
pipeline_version: None,
}),
}
}
}
#[inline]
fn string_value_from_header(
headers: &HeaderMap,
header_key: &str,
) -> StdResult<Option<String>, (StatusCode, String)> {
headers
.get(header_key)
.map(|value| {
String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
(
StatusCode::BAD_REQUEST,
format!("`{}` header is not valid UTF-8 string type.", header_key),
)
})
})
.transpose()
}

View File

@@ -34,13 +34,13 @@ use serde_json::Value;
use session::context::{Channel, QueryContext, QueryContextRef};
use super::header::collect_plan_metrics;
use crate::http::arrow_result::ArrowResponse;
use crate::http::csv_result::CsvResponse;
use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::json_result::JsonResponse;
use crate::http::table_result::TableResponse;
use crate::http::result::arrow_result::ArrowResponse;
use crate::http::result::csv_result::CsvResponse;
use crate::http::result::error_result::ErrorResponse;
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
use crate::http::result::json_result::JsonResponse;
use crate::http::result::table_result::TableResponse;
use crate::http::{
ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
HttpResponse, ResponseFormat,

View File

@@ -67,7 +67,8 @@ pub static GREPTIME_DB_HEADER_NAME: HeaderName =
pub static GREPTIME_TIMEZONE_HEADER_NAME: HeaderName =
HeaderName::from_static(constants::GREPTIME_TIMEZONE_HEADER_NAME);
pub static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
pub static CONTENT_TYPE_PROTOBUF_STR: &str = "application/x-protobuf";
pub static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static(CONTENT_TYPE_PROTOBUF_STR);
pub static CONTENT_ENCODING_SNAPPY: HeaderValue = HeaderValue::from_static("snappy");
pub struct GreptimeDbName(Option<String>);

View File

@@ -12,19 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use core::str;
use std::result::Result as StdResult;
use std::sync::Arc;
use axum::extract::{FromRequestParts, State};
use axum::http::header::HeaderValue;
use axum::http::request::Parts;
use axum::http::{header, StatusCode};
use axum::extract::State;
use axum::http::header;
use axum::response::IntoResponse;
use axum::{async_trait, Extension};
use axum::Extension;
use bytes::Bytes;
use common_telemetry::tracing;
use http::HeaderMap;
use opentelemetry_proto::tonic::collector::logs::v1::{
ExportLogsServiceRequest, ExportLogsServiceResponse,
};
@@ -35,19 +30,14 @@ use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use pipeline::util::to_pipeline_version;
use pipeline::{PipelineWay, SelectInfo};
use pipeline::PipelineWay;
use prost::Message;
use session::context::{Channel, QueryContext};
use snafu::prelude::*;
use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME;
use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
use crate::error::{self, InvalidUtf8ValueSnafu, PipelineSnafu, Result};
use crate::http::header::constants::{
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
};
use crate::otlp::logs::LOG_TABLE_NAME;
use crate::error::{self, PipelineSnafu, Result};
use crate::http::extractor::{LogTableName, PipelineInfo, SelectInfoWrapper, TraceTableName};
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
@@ -83,18 +73,13 @@ pub async fn metrics(
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))]
pub async fn traces(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
header: HeaderMap,
TraceTableName(table_name): TraceTableName,
Extension(mut query_ctx): Extension<QueryContext>,
bytes: Bytes,
) -> Result<OtlpResponse<ExportTraceServiceResponse>> {
let db = query_ctx.get_db_string();
let table_name = extract_string_value_from_header(
&header,
GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
Some(TRACE_TABLE_NAME),
)?
// safety here, we provide default value for table_name
.unwrap();
let table_name = table_name.unwrap_or_else(|| TRACE_TABLE_NAME.to_string());
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED
@@ -113,139 +98,17 @@ pub async fn traces(
})
}
pub struct PipelineInfo {
pub pipeline_name: Option<String>,
pub pipeline_version: Option<String>,
}
fn parse_header_value_to_string(header: &HeaderValue) -> Result<String> {
String::from_utf8(header.as_bytes().to_vec()).context(InvalidUtf8ValueSnafu)
}
fn extract_string_value_from_header(
headers: &HeaderMap,
header: &str,
default_table_name: Option<&str>,
) -> Result<Option<String>> {
let table_name = headers.get(header);
match table_name {
Some(name) => parse_header_value_to_string(name).map(Some),
None => match default_table_name {
Some(name) => Ok(Some(name.to_string())),
None => Ok(None),
},
}
}
fn utf8_error(header_name: &str) -> impl Fn(error::Error) -> (StatusCode, String) + use<'_> {
move |_| {
(
StatusCode::BAD_REQUEST,
format!("`{}` header is not valid UTF-8 string type.", header_name),
)
}
}
#[async_trait]
impl<S> FromRequestParts<S> for PipelineInfo
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let headers = &parts.headers;
let pipeline_name =
extract_string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, None)
.map_err(utf8_error(GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME))?;
let pipeline_version = extract_string_value_from_header(
headers,
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
None,
)
.map_err(utf8_error(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME))?;
match (pipeline_name, pipeline_version) {
(Some(name), Some(version)) => Ok(PipelineInfo {
pipeline_name: Some(name),
pipeline_version: Some(version),
}),
(None, _) => Ok(PipelineInfo {
pipeline_name: None,
pipeline_version: None,
}),
(Some(name), None) => Ok(PipelineInfo {
pipeline_name: Some(name),
pipeline_version: None,
}),
}
}
}
pub struct TableInfo {
table_name: String,
}
#[async_trait]
impl<S> FromRequestParts<S> for TableInfo
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let table_name = extract_string_value_from_header(
&parts.headers,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
Some(LOG_TABLE_NAME),
)
.map_err(utf8_error(GREPTIME_LOG_TABLE_NAME_HEADER_NAME))?
// safety here, we provide default value for table_name
.unwrap();
Ok(TableInfo { table_name })
}
}
pub struct SelectInfoWrapper(SelectInfo);
#[async_trait]
impl<S> FromRequestParts<S> for SelectInfoWrapper
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let select = extract_string_value_from_header(
&parts.headers,
GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME,
None,
)
.map_err(utf8_error(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME))?;
match select {
Some(name) => {
if name.is_empty() {
Ok(SelectInfoWrapper(Default::default()))
} else {
Ok(SelectInfoWrapper(SelectInfo::from(name)))
}
}
None => Ok(SelectInfoWrapper(Default::default())),
}
}
}
#[axum_macros::debug_handler]
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "logs"))]
pub async fn logs(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(mut query_ctx): Extension<QueryContext>,
pipeline_info: PipelineInfo,
table_info: TableInfo,
LogTableName(tablename): LogTableName,
SelectInfoWrapper(select_info): SelectInfoWrapper,
bytes: Bytes,
) -> Result<OtlpResponse<ExportLogsServiceResponse>> {
let tablename = tablename.unwrap_or_else(|| "opentelemetry_logs".to_string());
let db = query_ctx.get_db_string();
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
@@ -272,7 +135,7 @@ pub async fn logs(
};
handler
.logs(request, pipeline_way, table_info.table_name, query_ctx)
.logs(request, pipeline_way, tablename, query_ctx)
.await
.map(|o| OtlpResponse {
resp_body: ExportLogsServiceResponse {

View File

@@ -45,7 +45,7 @@ use serde_json::Value;
use session::context::QueryContext;
use snafu::{Location, OptionExt, ResultExt};
pub use super::prometheus_resp::PrometheusJsonResponse;
pub use super::result::prometheus_resp::PrometheusJsonResponse;
use crate::error::{
CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, Result, TableNotFoundSnafu,
UnexpectedResultSnafu,

View File

@@ -0,0 +1,23 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod arrow_result;
pub(crate) mod csv_result;
pub mod error_result;
pub(crate) mod greptime_manage_resp;
pub mod greptime_result_v1;
pub mod influxdb_result_v1;
pub(crate) mod json_result;
pub(crate) mod prometheus_resp;
pub(crate) mod table_result;

View File

@@ -29,8 +29,8 @@ use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{self, Error};
use crate::http::error_result::ErrorResponse;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::result::error_result::ErrorResponse;
use crate::http::{HttpResponse, ResponseFormat};
#[derive(Serialize, Deserialize, Debug, JsonSchema)]

View File

@@ -23,10 +23,10 @@ use mime_guess::mime;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use super::process_with_limit;
use crate::http::error_result::ErrorResponse;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};
// use super::process_with_limit;
use crate::http::result::error_result::ErrorResponse;
use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat};
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct CsvResponse {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use axum::http::{HeaderValue, StatusCode as HttpStatusCode};
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_error::ext::ErrorExt;
@@ -21,6 +21,7 @@ use common_telemetry::{debug, error};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::error::status_code_to_http_status;
use crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE;
use crate::http::header::GREPTIME_DB_HEADER_EXECUTION_TIME;
@@ -87,49 +88,3 @@ impl IntoResponse for ErrorResponse {
(status_code, resp).into_response()
}
}
pub fn status_code_to_http_status(status_code: &StatusCode) -> HttpStatusCode {
match status_code {
StatusCode::Success | StatusCode::Cancelled => HttpStatusCode::OK,
StatusCode::Unsupported
| StatusCode::InvalidArguments
| StatusCode::InvalidSyntax
| StatusCode::RequestOutdated
| StatusCode::RegionAlreadyExists
| StatusCode::TableColumnExists
| StatusCode::TableAlreadyExists
| StatusCode::RegionNotFound
| StatusCode::DatabaseNotFound
| StatusCode::TableNotFound
| StatusCode::TableColumnNotFound
| StatusCode::PlanQuery
| StatusCode::DatabaseAlreadyExists
| StatusCode::FlowNotFound
| StatusCode::FlowAlreadyExists => HttpStatusCode::BAD_REQUEST,
StatusCode::AuthHeaderNotFound
| StatusCode::InvalidAuthHeader
| StatusCode::UserNotFound
| StatusCode::UnsupportedPasswordType
| StatusCode::UserPasswordMismatch
| StatusCode::RegionReadonly => HttpStatusCode::UNAUTHORIZED,
StatusCode::PermissionDenied | StatusCode::AccessDenied => HttpStatusCode::FORBIDDEN,
StatusCode::RateLimited => HttpStatusCode::TOO_MANY_REQUESTS,
StatusCode::RegionNotReady
| StatusCode::TableUnavailable
| StatusCode::RegionBusy
| StatusCode::StorageUnavailable
| StatusCode::External => HttpStatusCode::SERVICE_UNAVAILABLE,
StatusCode::Internal
| StatusCode::Unexpected
| StatusCode::IllegalState
| StatusCode::Unknown
| StatusCode::RuntimeResourcesExhausted
| StatusCode::EngineExecuteQuery => HttpStatusCode::INTERNAL_SERVER_ERROR,
}
}

View File

@@ -22,10 +22,10 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use super::header::GREPTIME_DB_HEADER_METRICS;
use super::process_with_limit;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};
use crate::http::header::{
GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT, GREPTIME_DB_HEADER_METRICS,
};
use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat};
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct GreptimedbV1Response {

View File

@@ -23,8 +23,8 @@ use serde_json::Value;
use snafu::ResultExt;
use crate::error::{Error, ToJsonSnafu};
use crate::http::error_result::ErrorResponse;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::result::error_result::ErrorResponse;
use crate::http::{Epoch, HttpResponse, ResponseFormat};
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]

View File

@@ -21,10 +21,9 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{json, Map, Value};
use super::process_with_limit;
use crate::http::error_result::ErrorResponse;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};
use crate::http::result::error_result::ErrorResponse;
use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat};
/// The json format here is different from the default json output of `GreptimedbV1` result.
/// `JsonResponse` is intended to make it easier for user to consume data.

View File

@@ -32,12 +32,13 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::{OptionExt, ResultExt};
use super::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS};
use super::prometheus::{
use crate::error::{
status_code_to_http_status, CollectRecordbatchSnafu, Result, UnexpectedResultSnafu,
};
use crate::http::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS};
use crate::http::prometheus::{
PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusResponse,
};
use crate::error::{CollectRecordbatchSnafu, Result, UnexpectedResultSnafu};
use crate::http::error_result::status_code_to_http_status;
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PrometheusJsonResponse {

View File

@@ -24,10 +24,9 @@ use mime_guess::mime;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use super::process_with_limit;
use crate::http::error_result::ErrorResponse;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};
use crate::http::result::error_result::ErrorResponse;
use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat};
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct TableResponse {

View File

@@ -26,7 +26,7 @@ use session::context::QueryContext;
use snafu::ResultExt;
use crate::error::{HyperSnafu, InvalidUtf8ValueSnafu};
use crate::http::error_result::ErrorResponse;
use crate::http::result::error_result::ErrorResponse;
use crate::http::{ApiState, GreptimedbV1Response, HttpResponse};
macro_rules! json_err {

View File

@@ -161,6 +161,19 @@ lazy_static! {
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
)
.unwrap();
pub static ref METRIC_LOKI_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!(
"greptime_servers_loki_logs_ingestion_counter",
"servers loki logs ingestion counter",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_LOKI_LOGS_INGESTION_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_loki_logs_ingestion_elapsed",
"servers loki logs ingestion elapsed",
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_http_logs_transform_elapsed",

View File

@@ -372,6 +372,7 @@ pub enum Channel {
Grpc = 6,
Influx = 7,
Opentsdb = 8,
Loki = 9,
}
impl From<u32> for Channel {
@@ -385,6 +386,7 @@ impl From<u32> for Channel {
6 => Self::Grpc,
7 => Self::Influx,
8 => Self::Opentsdb,
9 => Self::Loki,
_ => Self::Unknown,
}
@@ -412,6 +414,7 @@ impl Display for Channel {
Channel::Grpc => write!(f, "grpc"),
Channel::Influx => write!(f, "influx"),
Channel::Opentsdb => write!(f, "opentsdb"),
Channel::Loki => write!(f, "loki"),
Channel::Unknown => write!(f, "unknown"),
}
}

View File

@@ -45,6 +45,7 @@ flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
futures-util.workspace = true
loki-api = "0.1"
meta-client.workspace = true
meta-srv = { workspace = true, features = ["mock"] }
moka.workspace = true

View File

@@ -14,6 +14,7 @@
use std::collections::BTreeMap;
use std::io::Write;
use std::str::FromStr;
use api::prom_store::remote::WriteRequest;
use auth::user_provider_from_option;
@@ -21,18 +22,21 @@ use axum::http::{HeaderName, HeaderValue, StatusCode};
use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
use loki_api::logproto::{EntryAdapter, PushRequest, StreamAdapter};
use loki_api::prost_types::Timestamp;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics;
use prost::Message;
use serde_json::{json, Value};
use servers::http::error_result::ErrorResponse;
use servers::http::greptime_result_v1::GreptimedbV1Response;
use servers::http::handler::HealthResponse;
use servers::http::header::constants::GREPTIME_LOG_TABLE_NAME_HEADER_NAME;
use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME};
use servers::http::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use servers::http::result::error_result::ErrorResponse;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
use servers::http::test_helpers::{TestClient, TestResponse};
use servers::http::GreptimeQueryOutput;
use servers::prom_store;
@@ -92,6 +96,7 @@ macro_rules! http_tests {
test_otlp_metrics,
test_otlp_traces,
test_otlp_logs,
test_loki_logs,
);
)*
};
@@ -1690,6 +1695,69 @@ pub async fn test_otlp_logs(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_loki_logs(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_loke_logs").await;
let client = TestClient::new(app);
// init loki request
let req: PushRequest = PushRequest {
streams: vec![StreamAdapter {
labels: "{service=\"test\",source=\"integration\"}".to_string(),
entries: vec![EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()),
line: "this is a log message".to_string(),
}],
hash: rand::random(),
}],
};
let encode = req.encode_to_vec();
let body = prom_store::snappy_compress(&encode).unwrap();
// write to loki
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static(GREPTIME_LOG_TABLE_NAME_HEADER_NAME),
HeaderValue::from_static("loki_table_name"),
),
],
"/v1/loki/api/v1/push",
body,
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
// test schema
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_schema",
&client,
"show create table loki_table_name;",
expected,
)
.await;
// test content
let expected = r#"[[1730976830000000000,"this is a log message","test","integration"]]"#;
validate_data(
"loki_content",
&client,
"select * from loki_table_name;",
expected,
)
.await;
guard.remove_all().await;
}
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())