mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: logs query endpoint (#5202)
* define endpoint Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * planner Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * update lock file Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * add unit test Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix toml format Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * revert metric change Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * Update src/query/src/log_query/planner.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix compile Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * refactor and tests Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
16
Cargo.lock
generated
16
Cargo.lock
generated
@@ -4152,6 +4152,7 @@ dependencies = [
|
||||
"futures",
|
||||
"humantime-serde",
|
||||
"lazy_static",
|
||||
"log-query",
|
||||
"log-store",
|
||||
"meta-client",
|
||||
"opentelemetry-proto 0.5.0",
|
||||
@@ -6122,6 +6123,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"serde",
|
||||
"snafu 0.8.5",
|
||||
"table",
|
||||
]
|
||||
@@ -8160,7 +8162,7 @@ dependencies = [
|
||||
"rand",
|
||||
"ring 0.17.8",
|
||||
"rust_decimal",
|
||||
"thiserror 2.0.4",
|
||||
"thiserror 2.0.6",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-util",
|
||||
@@ -9098,6 +9100,7 @@ dependencies = [
|
||||
"humantime",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"log-query",
|
||||
"meter-core",
|
||||
"meter-macros",
|
||||
"num",
|
||||
@@ -10952,6 +10955,7 @@ dependencies = [
|
||||
"json5",
|
||||
"jsonb",
|
||||
"lazy_static",
|
||||
"log-query",
|
||||
"loki-api",
|
||||
"mime_guess",
|
||||
"mysql_async",
|
||||
@@ -12434,11 +12438,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "2.0.4"
|
||||
version = "2.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2f49a1853cf82743e3b7950f77e0f4d622ca36cf4317cba00c767838bac8d490"
|
||||
checksum = "8fec2a1820ebd077e2b90c4df007bebf344cd394098a13c563957d0afc83ea47"
|
||||
dependencies = [
|
||||
"thiserror-impl 2.0.4",
|
||||
"thiserror-impl 2.0.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -12454,9 +12458,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "2.0.4"
|
||||
version = "2.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8381894bb3efe0c4acac3ded651301ceee58a15d47c2e34885ed1908ad667061"
|
||||
checksum = "d65750cab40f4ff1929fb1ba509e9914eb756131cef4210da8d5d700d26f6312"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
||||
@@ -238,6 +238,7 @@ file-engine = { path = "src/file-engine" }
|
||||
flow = { path = "src/flow" }
|
||||
frontend = { path = "src/frontend", default-features = false }
|
||||
index = { path = "src/index" }
|
||||
log-query = { path = "src/log-query" }
|
||||
log-store = { path = "src/log-store" }
|
||||
meta-client = { path = "src/meta-client" }
|
||||
meta-srv = { path = "src/meta-srv" }
|
||||
|
||||
@@ -25,6 +25,7 @@ pub enum PermissionReq<'a> {
|
||||
GrpcRequest(&'a Request),
|
||||
SqlStatement(&'a Statement),
|
||||
PromQuery,
|
||||
LogQuery,
|
||||
Opentsdb,
|
||||
LineProtocol,
|
||||
PromStoreWrite,
|
||||
|
||||
@@ -26,3 +26,4 @@ pub mod function_registry;
|
||||
pub mod handlers;
|
||||
pub mod helper;
|
||||
pub mod state;
|
||||
pub mod utils;
|
||||
|
||||
@@ -204,20 +204,10 @@ impl PatternAst {
|
||||
fn convert_literal(column: &str, pattern: &str) -> Expr {
|
||||
logical_expr::col(column).like(logical_expr::lit(format!(
|
||||
"%{}%",
|
||||
Self::escape_pattern(pattern)
|
||||
crate::utils::escape_like_pattern(pattern)
|
||||
)))
|
||||
}
|
||||
|
||||
fn escape_pattern(pattern: &str) -> String {
|
||||
pattern
|
||||
.chars()
|
||||
.flat_map(|c| match c {
|
||||
'\\' | '%' | '_' => vec!['\\', c],
|
||||
_ => vec![c],
|
||||
})
|
||||
.collect::<String>()
|
||||
}
|
||||
|
||||
/// Transform this AST with preset rules to make it correct.
|
||||
fn transform_ast(self) -> Result<Self> {
|
||||
self.transform_up(Self::collapse_binary_branch_fn)
|
||||
|
||||
58
src/common/function/src/utils.rs
Normal file
58
src/common/function/src/utils.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// Escapes special characters in the provided pattern string for `LIKE`.
|
||||
///
|
||||
/// Specifically, it prefixes the backslash (`\`), percent (`%`), and underscore (`_`)
|
||||
/// characters with an additional backslash to ensure they are treated literally.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// let escaped = escape_pattern("100%_some\\path");
|
||||
/// assert_eq!(escaped, "100\\%\\_some\\\\path");
|
||||
/// ```
|
||||
pub fn escape_like_pattern(pattern: &str) -> String {
|
||||
pattern
|
||||
.chars()
|
||||
.flat_map(|c| match c {
|
||||
'\\' | '%' | '_' => vec!['\\', c],
|
||||
_ => vec![c],
|
||||
})
|
||||
.collect::<String>()
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_escape_like_pattern() {
|
||||
assert_eq!(
|
||||
escape_like_pattern("100%_some\\path"),
|
||||
"100\\%\\_some\\\\path"
|
||||
);
|
||||
assert_eq!(escape_like_pattern(""), "");
|
||||
assert_eq!(escape_like_pattern("hello"), "hello");
|
||||
assert_eq!(escape_like_pattern("\\%_"), "\\\\\\%\\_");
|
||||
assert_eq!(escape_like_pattern("%%__\\\\"), "\\%\\%\\_\\_\\\\\\\\");
|
||||
assert_eq!(escape_like_pattern("abc123"), "abc123");
|
||||
assert_eq!(escape_like_pattern("%_\\"), "\\%\\_\\\\");
|
||||
assert_eq!(
|
||||
escape_like_pattern("%%__\\\\another%string"),
|
||||
"\\%\\%\\_\\_\\\\\\\\another\\%string"
|
||||
);
|
||||
assert_eq!(escape_like_pattern("foo%bar_"), "foo\\%bar\\_");
|
||||
assert_eq!(escape_like_pattern("\\_\\%"), "\\\\\\_\\\\\\%");
|
||||
}
|
||||
}
|
||||
@@ -41,6 +41,7 @@ datafusion-expr.workspace = true
|
||||
datanode.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
lazy_static.workspace = true
|
||||
log-query.workspace = true
|
||||
log-store.workspace = true
|
||||
meta-client.workspace = true
|
||||
opentelemetry-proto.workspace = true
|
||||
|
||||
@@ -16,6 +16,7 @@ pub mod builder;
|
||||
mod grpc;
|
||||
mod influxdb;
|
||||
mod log_handler;
|
||||
mod logs;
|
||||
mod opentsdb;
|
||||
mod otlp;
|
||||
mod prom_store;
|
||||
@@ -64,8 +65,8 @@ use servers::prometheus_handler::PrometheusHandler;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use servers::query_handler::{
|
||||
InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
|
||||
PipelineHandler, PromStoreProtocolHandler, ScriptHandler,
|
||||
InfluxdbLineProtocolHandler, LogQueryHandler, OpenTelemetryProtocolHandler,
|
||||
OpentsdbProtocolHandler, PipelineHandler, PromStoreProtocolHandler, ScriptHandler,
|
||||
};
|
||||
use servers::server::ServerHandlers;
|
||||
use session::context::QueryContextRef;
|
||||
@@ -99,6 +100,7 @@ pub trait FrontendInstance:
|
||||
+ ScriptHandler
|
||||
+ PrometheusHandler
|
||||
+ PipelineHandler
|
||||
+ LogQueryHandler
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static
|
||||
|
||||
67
src/frontend/src/instance/logs.rs
Normal file
67
src/frontend/src/instance/logs.rs
Normal file
@@ -0,0 +1,67 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use client::Output;
|
||||
use common_error::ext::BoxedError;
|
||||
use log_query::LogQuery;
|
||||
use server_error::Result as ServerResult;
|
||||
use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu};
|
||||
use servers::interceptor::{LogQueryInterceptor, LogQueryInterceptorRef};
|
||||
use servers::query_handler::LogQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use tonic::async_trait;
|
||||
|
||||
use super::Instance;
|
||||
|
||||
#[async_trait]
|
||||
impl LogQueryHandler for Instance {
|
||||
async fn query(&self, mut request: LogQuery, ctx: QueryContextRef) -> ServerResult<Output> {
|
||||
let interceptor = self
|
||||
.plugins
|
||||
.get::<LogQueryInterceptorRef<server_error::Error>>();
|
||||
|
||||
self.plugins
|
||||
.get::<PermissionCheckerRef>()
|
||||
.as_ref()
|
||||
.check_permission(ctx.current_user(), PermissionReq::LogQuery)
|
||||
.context(AuthSnafu)?;
|
||||
|
||||
interceptor.as_ref().pre_query(&request, ctx.clone())?;
|
||||
|
||||
request
|
||||
.time_filter
|
||||
.canonicalize()
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteQuerySnafu)?;
|
||||
|
||||
let plan = self
|
||||
.query_engine
|
||||
.planner()
|
||||
.plan_logs_query(request, ctx.clone())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteQuerySnafu)?;
|
||||
|
||||
let output = self
|
||||
.statement_executor
|
||||
.exec_plan(plan, ctx.clone())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteQuerySnafu)?;
|
||||
|
||||
Ok(interceptor.as_ref().post_query(output, ctx.clone())?)
|
||||
}
|
||||
}
|
||||
@@ -87,6 +87,7 @@ where
|
||||
let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
|
||||
builder =
|
||||
builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
|
||||
builder = builder.with_logs_handler(self.instance.clone());
|
||||
|
||||
if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
|
||||
builder = builder.with_user_provider(user_provider);
|
||||
|
||||
@@ -11,5 +11,6 @@ workspace = true
|
||||
chrono.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
serde.workspace = true
|
||||
snafu.workspace = true
|
||||
table.workspace = true
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use snafu::Snafu;
|
||||
|
||||
@@ -41,6 +42,15 @@ impl ErrorExt for Error {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
Error::InvalidTimeFilter { .. }
|
||||
| Error::InvalidDateFormat { .. }
|
||||
| Error::InvalidSpanFormat { .. }
|
||||
| Error::EndBeforeStart { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use chrono::{DateTime, Datelike, Duration, NaiveDate, NaiveTime, TimeZone, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::error::{
|
||||
@@ -21,9 +22,10 @@ use crate::error::{
|
||||
};
|
||||
|
||||
/// GreptimeDB's log query request.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct LogQuery {
|
||||
/// A fully qualified table name to query logs from.
|
||||
pub table_name: TableName,
|
||||
pub table: TableName,
|
||||
/// Specifies the time range for the log query. See [`TimeFilter`] for more details.
|
||||
pub time_filter: TimeFilter,
|
||||
/// Columns with filters to query.
|
||||
@@ -34,6 +36,18 @@ pub struct LogQuery {
|
||||
pub context: Context,
|
||||
}
|
||||
|
||||
impl Default for LogQuery {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
table: TableName::new("", "", ""),
|
||||
time_filter: Default::default(),
|
||||
columns: vec![],
|
||||
limit: None,
|
||||
context: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a time range for log query.
|
||||
///
|
||||
/// This struct allows various formats to express a time range from the user side
|
||||
@@ -58,7 +72,7 @@ pub struct LogQuery {
|
||||
///
|
||||
/// This struct doesn't require a timezone to be presented. When the timezone is not
|
||||
/// provided, it will fill the default timezone with the same rules akin to other queries.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct TimeFilter {
|
||||
pub start: Option<String>,
|
||||
pub end: Option<String>,
|
||||
@@ -69,8 +83,7 @@ impl TimeFilter {
|
||||
/// Validate and canonicalize the time filter.
|
||||
///
|
||||
/// This function will try to fill the missing fields and convert all dates to timestamps
|
||||
// false positive
|
||||
#[allow(unused_assignments)]
|
||||
#[allow(unused_assignments)] // false positive
|
||||
pub fn canonicalize(&mut self) -> Result<()> {
|
||||
let mut start_dt = None;
|
||||
let mut end_dt = None;
|
||||
@@ -209,6 +222,7 @@ impl TimeFilter {
|
||||
}
|
||||
|
||||
/// Represents a column with filters to query.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ColumnFilters {
|
||||
/// Case-sensitive column name to query.
|
||||
pub column_name: String,
|
||||
@@ -216,6 +230,7 @@ pub struct ColumnFilters {
|
||||
pub filters: Vec<ContentFilter>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum ContentFilter {
|
||||
/// Only match the exact content.
|
||||
///
|
||||
@@ -234,13 +249,16 @@ pub enum ContentFilter {
|
||||
Compound(Vec<ContentFilter>, BinaryOperator),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum BinaryOperator {
|
||||
And,
|
||||
Or,
|
||||
}
|
||||
|
||||
/// Controls how many adjacent lines to return.
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
pub enum Context {
|
||||
#[default]
|
||||
None,
|
||||
/// Specify the number of lines before and after the matched line separately.
|
||||
Lines(usize, usize),
|
||||
|
||||
@@ -46,6 +46,7 @@ greptime-proto.workspace = true
|
||||
humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
lazy_static.workspace = true
|
||||
log-query.workspace = true
|
||||
meter-core.workspace = true
|
||||
meter-macros.workspace = true
|
||||
object-store.workspace = true
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
#![feature(trait_upcasting)]
|
||||
#![feature(try_blocks)]
|
||||
#![feature(stmt_expr_attributes)]
|
||||
#![feature(iterator_try_collect)]
|
||||
|
||||
mod analyze;
|
||||
pub mod dataframe;
|
||||
@@ -25,6 +26,7 @@ pub mod dist_plan;
|
||||
pub mod dummy_catalog;
|
||||
pub mod error;
|
||||
pub mod executor;
|
||||
pub mod log_query;
|
||||
pub mod metrics;
|
||||
mod optimizer;
|
||||
pub mod parser;
|
||||
|
||||
16
src/query/src/log_query.rs
Normal file
16
src/query/src/log_query.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod error;
|
||||
pub mod planner;
|
||||
84
src/query/src/log_query/error.rs
Normal file
84
src/query/src/log_query/error.rs
Normal file
@@ -0,0 +1,84 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use datafusion::error::DataFusionError;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("General catalog error"))]
|
||||
Catalog {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Internal error during building DataFusion plan"))]
|
||||
DataFusionPlanning {
|
||||
#[snafu(source)]
|
||||
error: datafusion::error::DataFusionError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unknown table type, downcast failed"))]
|
||||
UnknownTable {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot find time index column"))]
|
||||
TimeIndexNotFound {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unimplemented feature: {}", feature))]
|
||||
Unimplemented {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
feature: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
use Error::*;
|
||||
match self {
|
||||
Catalog { source, .. } => source.status_code(),
|
||||
DataFusionPlanning { .. } => StatusCode::External,
|
||||
UnknownTable { .. } | TimeIndexNotFound { .. } => StatusCode::Internal,
|
||||
Unimplemented { .. } => StatusCode::Unsupported,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl From<Error> for DataFusionError {
|
||||
fn from(err: Error) -> Self {
|
||||
DataFusionError::External(Box::new(err))
|
||||
}
|
||||
}
|
||||
371
src/query/src/log_query/planner.rs
Normal file
371
src/query/src/log_query/planner.rs
Normal file
@@ -0,0 +1,371 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use catalog::table_source::DfTableSourceProvider;
|
||||
use common_function::utils::escape_like_pattern;
|
||||
use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::utils::conjunction;
|
||||
use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
|
||||
use datafusion_sql::TableReference;
|
||||
use datatypes::schema::Schema;
|
||||
use log_query::{ColumnFilters, LogQuery, TimeFilter};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
|
||||
use crate::log_query::error::{
|
||||
CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnimplementedSnafu,
|
||||
UnknownTableSnafu,
|
||||
};
|
||||
|
||||
const DEFAULT_LIMIT: usize = 1000;
|
||||
|
||||
pub struct LogQueryPlanner {
|
||||
table_provider: DfTableSourceProvider,
|
||||
}
|
||||
|
||||
impl LogQueryPlanner {
|
||||
pub fn new(table_provider: DfTableSourceProvider) -> Self {
|
||||
Self { table_provider }
|
||||
}
|
||||
|
||||
pub async fn query_to_plan(&mut self, query: LogQuery) -> Result<LogicalPlan> {
|
||||
// Resolve table
|
||||
let table_ref: TableReference = query.table.table_ref().into();
|
||||
let table_source = self
|
||||
.table_provider
|
||||
.resolve_table(table_ref.clone())
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
let schema = table_source
|
||||
.as_any()
|
||||
.downcast_ref::<DefaultTableSource>()
|
||||
.context(UnknownTableSnafu)?
|
||||
.table_provider
|
||||
.as_any()
|
||||
.downcast_ref::<DfTableProviderAdapter>()
|
||||
.context(UnknownTableSnafu)?
|
||||
.table()
|
||||
.schema();
|
||||
|
||||
// Build the initial scan plan
|
||||
let mut plan_builder = LogicalPlanBuilder::scan(table_ref, table_source, None)
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
|
||||
// Collect filter expressions
|
||||
let mut filters = Vec::new();
|
||||
|
||||
// Time filter
|
||||
filters.push(self.build_time_filter(&query.time_filter, &schema)?);
|
||||
|
||||
// Column filters and projections
|
||||
let mut projected_columns = Vec::new();
|
||||
for column_filter in &query.columns {
|
||||
if let Some(expr) = self.build_column_filter(column_filter)? {
|
||||
filters.push(expr);
|
||||
}
|
||||
projected_columns.push(col(&column_filter.column_name));
|
||||
}
|
||||
|
||||
// Apply filters
|
||||
if !filters.is_empty() {
|
||||
let filter_expr = filters.into_iter().reduce(|a, b| a.and(b)).unwrap();
|
||||
plan_builder = plan_builder
|
||||
.filter(filter_expr)
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
}
|
||||
|
||||
// Apply projections
|
||||
plan_builder = plan_builder
|
||||
.project(projected_columns)
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
|
||||
// Apply limit
|
||||
plan_builder = plan_builder
|
||||
.limit(0, query.limit.or(Some(DEFAULT_LIMIT)))
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
|
||||
// Build the final plan
|
||||
let plan = plan_builder.build().context(DataFusionPlanningSnafu)?;
|
||||
|
||||
Ok(plan)
|
||||
}
|
||||
|
||||
fn build_time_filter(&self, time_filter: &TimeFilter, schema: &Schema) -> Result<Expr> {
|
||||
let timestamp_col = schema
|
||||
.timestamp_column()
|
||||
.with_context(|| TimeIndexNotFoundSnafu {})?
|
||||
.name
|
||||
.clone();
|
||||
|
||||
let start_time = ScalarValue::Utf8(time_filter.start.clone());
|
||||
let end_time = ScalarValue::Utf8(
|
||||
time_filter
|
||||
.end
|
||||
.clone()
|
||||
.or(Some("9999-12-31T23:59:59Z".to_string())),
|
||||
);
|
||||
let expr = col(timestamp_col.clone())
|
||||
.gt_eq(lit(start_time))
|
||||
.and(col(timestamp_col).lt_eq(lit(end_time)));
|
||||
|
||||
Ok(expr)
|
||||
}
|
||||
|
||||
/// Returns filter expressions
|
||||
fn build_column_filter(&self, column_filter: &ColumnFilters) -> Result<Option<Expr>> {
|
||||
if column_filter.filters.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let exprs = column_filter
|
||||
.filters
|
||||
.iter()
|
||||
.map(|filter| match filter {
|
||||
log_query::ContentFilter::Exact(pattern) => Ok(col(&column_filter.column_name)
|
||||
.like(lit(ScalarValue::Utf8(Some(escape_like_pattern(pattern)))))),
|
||||
log_query::ContentFilter::Prefix(pattern) => Ok(col(&column_filter.column_name)
|
||||
.like(lit(ScalarValue::Utf8(Some(format!(
|
||||
"{}%",
|
||||
escape_like_pattern(pattern)
|
||||
)))))),
|
||||
log_query::ContentFilter::Postfix(pattern) => Ok(col(&column_filter.column_name)
|
||||
.like(lit(ScalarValue::Utf8(Some(format!(
|
||||
"%{}",
|
||||
escape_like_pattern(pattern)
|
||||
)))))),
|
||||
log_query::ContentFilter::Contains(pattern) => Ok(col(&column_filter.column_name)
|
||||
.like(lit(ScalarValue::Utf8(Some(format!(
|
||||
"%{}%",
|
||||
escape_like_pattern(pattern)
|
||||
)))))),
|
||||
log_query::ContentFilter::Regex(..) => Err::<Expr, _>(
|
||||
UnimplementedSnafu {
|
||||
feature: "regex filter",
|
||||
}
|
||||
.build(),
|
||||
),
|
||||
log_query::ContentFilter::Compound(..) => Err::<Expr, _>(
|
||||
UnimplementedSnafu {
|
||||
feature: "compound filter",
|
||||
}
|
||||
.build(),
|
||||
),
|
||||
})
|
||||
.try_collect::<Vec<_>>()?;
|
||||
|
||||
Ok(conjunction(exprs))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::memory::MemoryCatalogManager;
|
||||
use catalog::RegisterTableRequest;
|
||||
use common_catalog::consts::DEFAULT_CATALOG_NAME;
|
||||
use common_query::test_util::DummyDecoder;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, SchemaRef};
|
||||
use log_query::{ContentFilter, Context};
|
||||
use session::context::QueryContext;
|
||||
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
|
||||
use table::table_name::TableName;
|
||||
use table::test_util::EmptyTable;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn mock_schema() -> SchemaRef {
|
||||
let columns = vec![
|
||||
ColumnSchema::new(
|
||||
"message".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
),
|
||||
ColumnSchema::new(
|
||||
"timestamp".to_string(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
ColumnSchema::new(
|
||||
"host".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
];
|
||||
|
||||
Arc::new(Schema::new(columns))
|
||||
}
|
||||
|
||||
/// Registers table under `greptime`, with `message` and `timestamp` and `host` columns.
|
||||
async fn build_test_table_provider(
|
||||
table_name_tuples: &[(String, String)],
|
||||
) -> DfTableSourceProvider {
|
||||
let catalog_list = MemoryCatalogManager::with_default_setup();
|
||||
for (schema_name, table_name) in table_name_tuples {
|
||||
let schema = mock_schema();
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
.schema(schema)
|
||||
.primary_key_indices(vec![2])
|
||||
.value_indices(vec![0])
|
||||
.next_column_id(1024)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_info = TableInfoBuilder::default()
|
||||
.name(table_name.to_string())
|
||||
.meta(table_meta)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table = EmptyTable::from_table_info(&table_info);
|
||||
|
||||
catalog_list
|
||||
.register_table_sync(RegisterTableRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: schema_name.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
table_id: 1024,
|
||||
table,
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
DfTableSourceProvider::new(
|
||||
catalog_list,
|
||||
false,
|
||||
QueryContext::arc(),
|
||||
DummyDecoder::arc(),
|
||||
false,
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_to_plan() {
|
||||
let table_provider =
|
||||
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
|
||||
let mut planner = LogQueryPlanner::new(table_provider);
|
||||
|
||||
let log_query = LogQuery {
|
||||
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
|
||||
time_filter: TimeFilter {
|
||||
start: Some("2021-01-01T00:00:00Z".to_string()),
|
||||
end: Some("2021-01-02T00:00:00Z".to_string()),
|
||||
span: None,
|
||||
},
|
||||
columns: vec![ColumnFilters {
|
||||
column_name: "message".to_string(),
|
||||
filters: vec![ContentFilter::Contains("error".to_string())],
|
||||
}],
|
||||
limit: Some(100),
|
||||
context: Context::None,
|
||||
};
|
||||
|
||||
let plan = planner.query_to_plan(log_query).await.unwrap();
|
||||
let expected = "Limit: skip=0, fetch=100 [message:Utf8]\
|
||||
\n Projection: greptime.public.test_table.message [message:Utf8]\
|
||||
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
|
||||
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
|
||||
|
||||
assert_eq!(plan.display_indent_schema().to_string(), expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_time_filter() {
|
||||
let table_provider =
|
||||
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
|
||||
let planner = LogQueryPlanner::new(table_provider);
|
||||
|
||||
let time_filter = TimeFilter {
|
||||
start: Some("2021-01-01T00:00:00Z".to_string()),
|
||||
end: Some("2021-01-02T00:00:00Z".to_string()),
|
||||
span: None,
|
||||
};
|
||||
|
||||
let expr = planner
|
||||
.build_time_filter(&time_filter, &mock_schema())
|
||||
.unwrap();
|
||||
|
||||
let expected_expr = col("timestamp")
|
||||
.gt_eq(lit(ScalarValue::Utf8(Some(
|
||||
"2021-01-01T00:00:00Z".to_string(),
|
||||
))))
|
||||
.and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
|
||||
"2021-01-02T00:00:00Z".to_string(),
|
||||
)))));
|
||||
|
||||
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_time_filter_without_end() {
|
||||
let table_provider =
|
||||
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
|
||||
let planner = LogQueryPlanner::new(table_provider);
|
||||
|
||||
let time_filter = TimeFilter {
|
||||
start: Some("2021-01-01T00:00:00Z".to_string()),
|
||||
end: None,
|
||||
span: None,
|
||||
};
|
||||
|
||||
let expr = planner
|
||||
.build_time_filter(&time_filter, &mock_schema())
|
||||
.unwrap();
|
||||
|
||||
let expected_expr = col("timestamp")
|
||||
.gt_eq(lit(ScalarValue::Utf8(Some(
|
||||
"2021-01-01T00:00:00Z".to_string(),
|
||||
))))
|
||||
.and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
|
||||
"9999-12-31T23:59:59Z".to_string(),
|
||||
)))));
|
||||
|
||||
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_column_filter() {
|
||||
let table_provider =
|
||||
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
|
||||
let planner = LogQueryPlanner::new(table_provider);
|
||||
|
||||
let column_filter = ColumnFilters {
|
||||
column_name: "message".to_string(),
|
||||
filters: vec![
|
||||
ContentFilter::Contains("error".to_string()),
|
||||
ContentFilter::Prefix("WARN".to_string()),
|
||||
],
|
||||
};
|
||||
|
||||
let expr_option = planner.build_column_filter(&column_filter).unwrap();
|
||||
assert!(expr_option.is_some());
|
||||
|
||||
let expr = expr_option.unwrap();
|
||||
|
||||
let expected_expr = col("message")
|
||||
.like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
|
||||
.and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
|
||||
|
||||
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_escape_pattern() {
|
||||
assert_eq!(escape_like_pattern("test"), "test");
|
||||
assert_eq!(escape_like_pattern("te%st"), "te\\%st");
|
||||
assert_eq!(escape_like_pattern("te_st"), "te\\_st");
|
||||
assert_eq!(escape_like_pattern("te\\st"), "te\\\\st");
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,7 @@ use datafusion::execution::context::SessionState;
|
||||
use datafusion::sql::planner::PlannerContext;
|
||||
use datafusion_expr::{Expr as DfExpr, LogicalPlan};
|
||||
use datafusion_sql::planner::{ParserOptions, SqlToRel};
|
||||
use log_query::LogQuery;
|
||||
use promql_parser::parser::EvalStmt;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
@@ -31,6 +32,7 @@ use sql::ast::Expr as SqlExpr;
|
||||
use sql::statements::statement::Statement;
|
||||
|
||||
use crate::error::{DataFusionSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu};
|
||||
use crate::log_query::planner::LogQueryPlanner;
|
||||
use crate::parser::QueryStatement;
|
||||
use crate::promql::planner::PromPlanner;
|
||||
use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
|
||||
@@ -41,6 +43,12 @@ use crate::{DfContextProviderAdapter, QueryEngineContext};
|
||||
pub trait LogicalPlanner: Send + Sync {
|
||||
async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
|
||||
|
||||
async fn plan_logs_query(
|
||||
&self,
|
||||
query: LogQuery,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<LogicalPlan>;
|
||||
|
||||
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
|
||||
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
@@ -182,6 +190,34 @@ impl LogicalPlanner for DfLogicalPlanner {
|
||||
}
|
||||
}
|
||||
|
||||
async fn plan_logs_query(
|
||||
&self,
|
||||
query: LogQuery,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<LogicalPlan> {
|
||||
let plan_decoder = Arc::new(DefaultPlanDecoder::new(
|
||||
self.session_state.clone(),
|
||||
&query_ctx,
|
||||
)?);
|
||||
let table_provider = DfTableSourceProvider::new(
|
||||
self.engine_state.catalog_manager().clone(),
|
||||
self.engine_state.disallow_cross_catalog_query(),
|
||||
query_ctx,
|
||||
plan_decoder,
|
||||
self.session_state
|
||||
.config_options()
|
||||
.sql_parser
|
||||
.enable_ident_normalization,
|
||||
);
|
||||
|
||||
let mut planner = LogQueryPlanner::new(table_provider);
|
||||
planner
|
||||
.query_to_plan(query)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryPlanSnafu)
|
||||
}
|
||||
|
||||
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
|
||||
self.optimize_logical_plan(plan)
|
||||
}
|
||||
|
||||
@@ -66,6 +66,7 @@ itertools.workspace = true
|
||||
json5 = "0.4"
|
||||
jsonb.workspace = true
|
||||
lazy_static.workspace = true
|
||||
log-query.workspace = true
|
||||
loki-api = "0.1"
|
||||
mime_guess = "2.0"
|
||||
notify.workspace = true
|
||||
|
||||
@@ -66,8 +66,8 @@ use crate::metrics_handler::MetricsHandler;
|
||||
use crate::prometheus_handler::PrometheusHandlerRef;
|
||||
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
|
||||
use crate::query_handler::{
|
||||
InfluxdbLineProtocolHandlerRef, OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef,
|
||||
PipelineHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef,
|
||||
InfluxdbLineProtocolHandlerRef, LogQueryHandlerRef, OpenTelemetryProtocolHandlerRef,
|
||||
OpentsdbProtocolHandlerRef, PipelineHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef,
|
||||
};
|
||||
use crate::server::Server;
|
||||
|
||||
@@ -80,6 +80,7 @@ mod extractor;
|
||||
pub mod handler;
|
||||
pub mod header;
|
||||
pub mod influxdb;
|
||||
pub mod logs;
|
||||
pub mod mem_prof;
|
||||
pub mod opentsdb;
|
||||
pub mod otlp;
|
||||
@@ -506,6 +507,17 @@ impl HttpServerBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_logs_handler(self, logs_handler: LogQueryHandlerRef) -> Self {
|
||||
let logs_router = HttpServer::route_logs(logs_handler);
|
||||
|
||||
Self {
|
||||
router: self
|
||||
.router
|
||||
.nest(&format!("/{HTTP_API_VERSION}"), logs_router),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_opentsdb_handler(self, handler: OpentsdbProtocolHandlerRef) -> Self {
|
||||
Self {
|
||||
router: self.router.nest(
|
||||
@@ -770,6 +782,12 @@ impl HttpServer {
|
||||
.with_state(api_state)
|
||||
}
|
||||
|
||||
fn route_logs<S>(log_handler: LogQueryHandlerRef) -> Router<S> {
|
||||
Router::new()
|
||||
.route("/logs", routing::get(logs::logs).post(logs::logs))
|
||||
.with_state(log_handler)
|
||||
}
|
||||
|
||||
/// Route Prometheus [HTTP API].
|
||||
///
|
||||
/// [HTTP API]: https://prometheus.io/docs/prometheus/latest/querying/api/
|
||||
|
||||
50
src/servers/src/http/logs.rs
Normal file
50
src/servers/src/http/logs.rs
Normal file
@@ -0,0 +1,50 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use axum::extract::State;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{Extension, Json};
|
||||
use common_telemetry::tracing;
|
||||
use log_query::LogQuery;
|
||||
use session::context::{Channel, QueryContext};
|
||||
|
||||
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
|
||||
use crate::query_handler::LogQueryHandlerRef;
|
||||
|
||||
#[axum_macros::debug_handler]
|
||||
#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "logs"))]
|
||||
pub async fn logs(
|
||||
State(handler): State<LogQueryHandlerRef>,
|
||||
Extension(mut query_ctx): Extension<QueryContext>,
|
||||
Json(params): Json<LogQuery>,
|
||||
) -> Response {
|
||||
let exec_start = Instant::now();
|
||||
let db = query_ctx.get_db_string();
|
||||
|
||||
query_ctx.set_channel(Channel::Http);
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
|
||||
let _timer = crate::metrics::METRIC_HTTP_LOGS_INGESTION_ELAPSED
|
||||
.with_label_values(&[db.as_str()])
|
||||
.start_timer();
|
||||
|
||||
let output = handler.query(params, query_ctx).await;
|
||||
let resp = GreptimedbV1Response::from_output(vec![output]).await;
|
||||
|
||||
resp.with_execution_time(exec_start.elapsed().as_millis() as u64)
|
||||
.into_response()
|
||||
}
|
||||
@@ -22,6 +22,7 @@ use async_trait::async_trait;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_query::Output;
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use log_query::LogQuery;
|
||||
use query::parser::PromQuery;
|
||||
use serde_json::Value;
|
||||
use session::context::QueryContextRef;
|
||||
@@ -458,3 +459,54 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// LogQueryInterceptor can track life cycle of a log query request
|
||||
/// and customize or abort its execution at given point.
|
||||
pub trait LogQueryInterceptor {
|
||||
type Error: ErrorExt;
|
||||
|
||||
/// Called before query is actually executed.
|
||||
fn pre_query(&self, _query: &LogQuery, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called after execution finished. The implementation can modify the
|
||||
/// output if needed.
|
||||
fn post_query(
|
||||
&self,
|
||||
output: Output,
|
||||
_query_ctx: QueryContextRef,
|
||||
) -> Result<Output, Self::Error> {
|
||||
Ok(output)
|
||||
}
|
||||
}
|
||||
|
||||
pub type LogQueryInterceptorRef<E> =
|
||||
Arc<dyn LogQueryInterceptor<Error = E> + Send + Sync + 'static>;
|
||||
|
||||
impl<E> LogQueryInterceptor for Option<&LogQueryInterceptorRef<E>>
|
||||
where
|
||||
E: ErrorExt,
|
||||
{
|
||||
type Error = E;
|
||||
|
||||
fn pre_query(&self, query: &LogQuery, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
|
||||
if let Some(this) = self {
|
||||
this.pre_query(query, query_ctx)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn post_query(
|
||||
&self,
|
||||
output: Output,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output, Self::Error> {
|
||||
if let Some(this) = self {
|
||||
this.post_query(output, query_ctx)
|
||||
} else {
|
||||
Ok(output)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,6 +72,14 @@ lazy_static! {
|
||||
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
|
||||
)
|
||||
.unwrap();
|
||||
/// Http logs query duration per database.
|
||||
pub static ref METRIC_HTTP_LOGS_ELAPSED: HistogramVec = register_histogram_vec!(
|
||||
"greptime_servers_http_logs_elapsed",
|
||||
"servers http logs elapsed",
|
||||
&[METRIC_DB_LABEL],
|
||||
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_AUTH_FAILURE: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_servers_auth_failure_count",
|
||||
"servers auth failure count",
|
||||
|
||||
@@ -34,6 +34,7 @@ use api::v1::RowInsertRequests;
|
||||
use async_trait::async_trait;
|
||||
use common_query::Output;
|
||||
use headers::HeaderValue;
|
||||
use log_query::LogQuery;
|
||||
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
|
||||
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
@@ -52,6 +53,7 @@ pub type PromStoreProtocolHandlerRef = Arc<dyn PromStoreProtocolHandler + Send +
|
||||
pub type OpenTelemetryProtocolHandlerRef = Arc<dyn OpenTelemetryProtocolHandler + Send + Sync>;
|
||||
pub type ScriptHandlerRef = Arc<dyn ScriptHandler + Send + Sync>;
|
||||
pub type PipelineHandlerRef = Arc<dyn PipelineHandler + Send + Sync>;
|
||||
pub type LogQueryHandlerRef = Arc<dyn LogQueryHandler + Send + Sync>;
|
||||
|
||||
#[async_trait]
|
||||
pub trait ScriptHandler {
|
||||
@@ -174,3 +176,9 @@ pub trait PipelineHandler {
|
||||
//// Build a pipeline from a string.
|
||||
fn build_pipeline(&self, pipeline: &str) -> Result<Pipeline<GreptimeTransformer>>;
|
||||
}
|
||||
|
||||
/// Handle log query requests.
|
||||
#[async_trait]
|
||||
pub trait LogQueryHandler {
|
||||
async fn query(&self, query: LogQuery, ctx: QueryContextRef) -> Result<Output>;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user