feat: log ingestion support (#4014)

* chore: add log http ingester scaffold

* chore: add some example code

* chore: add log inserter

* chore: add log handler file

* chore: add pipeline lib

* chore: import log handler

* chore: add pipelime http handler

* chore: add pipeline private table

* chore: add pipeline API

* chore: improve error handling

* chore: merge main

* chore: add multi content type support for log handler

* refactor: remove servers dep on pipeline

* refactor: move define_into_tonic_status to common-error

* refactor: bring in pipeline 3eb890c551b8d7f60c4491fcfec18966e2b210a4

* chore: fix typo

* refactor: bring in pipeline a95c9767d7056ab01dd8ca5fa1214456c6ffc72c

* chore: fix typo and license header

* refactor: move http event handler to a separate file

* chore: add test for pipeline

* chore: fmt

* refactor: bring in pipeline 7d2402701877901871dd1294a65ac937605a6a93

* refactor: move `pipeline_operator` to `pipeline` crate

* chore: minor update

* refactor: bring in pipeline 1711f4d46687bada72426d88cda417899e0ae3a4

* chore: add log

* chore: add log

* chore: remove open hook

* chore: minor update

* chore: fix fmt

* chore: minor update

* chore: rename desc for pipeline table

* refactor: remove updated_at in pipelines

* chore: add more content type support for log inserter api

* chore: introduce pipeline crate

* chore: update upload pipeline api

* chore: fix by pr commit

* chore: add some doc for pub fn/struct

* chore: some minro fix

* chore: add pipeline version support

* chore: impl log pipeline version

* chore: fix format issue

* fix: make the LogicalPlan of a query pipeline sorted in desc order

* chore: remove some debug log

* chore: replacing hashmap cache with moak

* chore: fix by pr commit

* chore: fix toml format issue

* chore: update Cargo.lock

* chore: fix by pr commit

* chore: fix some issue by pr commit

* chore: add more doc for pipeline version

---------

Co-authored-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
localhost
2024-06-15 01:03:30 +08:00
committed by GitHub
parent bf3ad44584
commit 01e3a24cf7
22 changed files with 1613 additions and 52 deletions

114
Cargo.lock generated
View File

@@ -339,7 +339,7 @@ dependencies = [
"arrow-data",
"arrow-schema",
"chrono",
"chrono-tz",
"chrono-tz 0.8.6",
"half 2.4.1",
"hashbrown 0.14.5",
"num",
@@ -771,6 +771,7 @@ dependencies = [
"matchit",
"memchr",
"mime",
"multer",
"percent-encoding",
"pin-project-lite",
"rustversion",
@@ -1388,7 +1389,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e"
dependencies = [
"chrono",
"chrono-tz-build",
"chrono-tz-build 0.2.1",
"phf",
]
[[package]]
name = "chrono-tz"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb"
dependencies = [
"chrono",
"chrono-tz-build 0.3.0",
"phf",
]
@@ -1403,6 +1415,17 @@ dependencies = [
"phf_codegen",
]
[[package]]
name = "chrono-tz-build"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1"
dependencies = [
"parse-zoneinfo",
"phf",
"phf_codegen",
]
[[package]]
name = "chunked_transfer"
version = "1.5.0"
@@ -2141,7 +2164,7 @@ version = "0.8.2"
dependencies = [
"arrow",
"chrono",
"chrono-tz",
"chrono-tz 0.8.6",
"common-error",
"common-macro",
"once_cell",
@@ -3544,6 +3567,15 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]]
name = "encoding_rs"
version = "0.8.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59"
dependencies = [
"cfg-if",
]
[[package]]
name = "endian-type"
version = "0.1.2"
@@ -3913,6 +3945,7 @@ dependencies = [
"common-time",
"common-version",
"datanode",
"datatypes",
"futures",
"humantime-serde",
"lazy_static",
@@ -3922,12 +3955,14 @@ dependencies = [
"opentelemetry-proto 0.5.0",
"operator",
"partition",
"pipeline",
"prometheus",
"prost 0.12.6",
"query",
"raft-engine",
"script",
"serde",
"serde_json",
"servers",
"session",
"snafu 0.8.3",
@@ -5941,6 +5976,24 @@ dependencies = [
"rand_core",
]
[[package]]
name = "multer"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http 0.2.12",
"httparse",
"log",
"memchr",
"mime",
"spin 0.9.8",
"version_check",
]
[[package]]
name = "multimap"
version = "0.8.3"
@@ -6756,7 +6809,7 @@ dependencies = [
"async-trait",
"bytes",
"chrono",
"chrono-tz",
"chrono-tz 0.8.6",
"datafusion 37.1.0",
"datafusion-expr 37.1.0",
"datafusion-physical-expr 37.1.0",
@@ -7229,6 +7282,58 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.8.2"
dependencies = [
"api",
"arrow",
"async-trait",
"catalog",
"chrono",
"chrono-tz 0.9.0",
"common-catalog",
"common-error",
"common-function",
"common-macro",
"common-meta",
"common-query",
"common-recordbatch",
"common-runtime",
"common-telemetry",
"common-time",
"crossbeam-utils",
"csv",
"datafusion 38.0.0",
"datafusion-common 38.0.0",
"datafusion-expr 38.0.0",
"datafusion-functions 38.0.0",
"datafusion-physical-expr 38.0.0",
"datatypes",
"futures",
"greptime-proto",
"itertools 0.10.5",
"lazy_static",
"moka",
"once_cell",
"operator",
"paste",
"prometheus",
"query",
"rayon",
"regex",
"ron",
"serde",
"serde_json",
"session",
"snafu 0.8.3",
"sql",
"table",
"tokio",
"urlencoding",
"yaml-rust",
]
[[package]]
name = "pkcs1"
version = "0.3.3"
@@ -9637,6 +9742,7 @@ dependencies = [
"permutation",
"pgwire",
"pin-project",
"pipeline",
"postgres-types",
"pprof",
"prometheus",

View File

@@ -46,6 +46,7 @@ members = [
"src/object-store",
"src/operator",
"src/partition",
"src/pipeline",
"src/plugins",
"src/promql",
"src/puffin",
@@ -224,6 +225,7 @@ mito2 = { path = "src/mito2" }
object-store = { path = "src/object-store" }
operator = { path = "src/operator" }
partition = { path = "src/partition" }
pipeline = { path = "src/pipeline" }
plugins = { path = "src/plugins" }
promql = { path = "src/promql" }
puffin = { path = "src/puffin" }

View File

@@ -30,6 +30,7 @@ pub enum PermissionReq<'a> {
PromStoreWrite,
PromStoreRead,
Otlp,
LogWrite,
}
#[derive(Debug)]

View File

@@ -44,6 +44,7 @@ meta-client.workspace = true
opentelemetry-proto.workspace = true
operator.workspace = true
partition.workspace = true
pipeline.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true
@@ -62,11 +63,13 @@ toml.workspace = true
tonic.workspace = true
[dev-dependencies]
catalog.workspace = true
catalog = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
datanode.workspace = true
datatypes.workspace = true
futures = "0.3"
meta-srv = { workspace = true, features = ["mock"] }
serde_json.workspace = true
strfmt = "0.2"
tower.workspace = true
uuid.workspace = true

View File

@@ -15,6 +15,7 @@
pub mod builder;
mod grpc;
mod influxdb;
mod log_handler;
mod opentsdb;
mod otlp;
mod prom_store;
@@ -48,6 +49,7 @@ use meta_client::MetaClientOptions;
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use pipeline::pipeline_operator::PipelineOperator;
use prometheus::HistogramTimer;
use query::metrics::OnDone;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
@@ -66,7 +68,7 @@ use servers::prometheus_handler::PrometheusHandler;
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
InfluxdbLineProtocolHandler, LogHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
PromStoreProtocolHandler, ScriptHandler,
};
use servers::server::ServerHandlers;
@@ -100,6 +102,7 @@ pub trait FrontendInstance:
+ OpenTelemetryProtocolHandler
+ ScriptHandler
+ PrometheusHandler
+ LogHandler
+ Send
+ Sync
+ 'static
@@ -108,12 +111,12 @@ pub trait FrontendInstance:
}
pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
pub type StatementExecutorRef = Arc<StatementExecutor>;
#[derive(Clone)]
pub struct Instance {
catalog_manager: CatalogManagerRef,
script_executor: Arc<ScriptExecutor>,
pipeline_operator: Arc<PipelineOperator>,
statement_executor: Arc<StatementExecutor>,
query_engine: QueryEngineRef,
plugins: Plugins,

View File

@@ -27,9 +27,10 @@ use operator::delete::Deleter;
use operator::insert::Inserter;
use operator::procedure::ProcedureServiceOperator;
use operator::request::Requester;
use operator::statement::StatementExecutor;
use operator::statement::{StatementExecutor, StatementExecutorRef};
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
use pipeline::pipeline_operator::PipelineOperator;
use query::QueryEngineFactory;
use servers::server::ServerHandlers;
use snafu::OptionExt;
@@ -37,7 +38,7 @@ use snafu::OptionExt;
use crate::error::{self, Result};
use crate::heartbeat::HeartbeatTask;
use crate::instance::region_query::FrontendRegionQueryHandler;
use crate::instance::{Instance, StatementExecutorRef};
use crate::instance::Instance;
use crate::script::ScriptExecutor;
/// The frontend [`Instance`] builder.
@@ -172,11 +173,19 @@ impl FrontendBuilder {
table_route_cache,
));
let pipeline_operator = Arc::new(PipelineOperator::new(
inserter.clone(),
statement_executor.clone(),
self.catalog_manager.clone(),
query_engine.clone(),
));
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
Ok(Instance {
catalog_manager: self.catalog_manager,
script_executor,
pipeline_operator,
statement_executor,
query_engine,
plugins,

View File

@@ -0,0 +1,93 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use pipeline::table::PipelineVersion;
use pipeline::{GreptimeTransformer, Pipeline};
use servers::error::{
AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
UnsupportedDeletePipelineSnafu,
};
use servers::query_handler::LogHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::instance::Instance;
#[async_trait]
impl LogHandler for Instance {
async fn insert_logs(
&self,
log: RowInsertRequests,
ctx: QueryContextRef,
) -> ServerResult<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::LogWrite)
.context(AuthSnafu)?;
self.handle_log_inserts(log, ctx).await
}
async fn get_pipeline(
&self,
name: &str,
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> ServerResult<Arc<Pipeline<GreptimeTransformer>>> {
self.pipeline_operator
.get_pipeline(query_ctx, name, version)
.await
.context(PipelineSnafu)
}
async fn insert_pipeline(
&self,
name: &str,
content_type: &str,
pipeline: &str,
query_ctx: QueryContextRef,
) -> ServerResult<()> {
self.pipeline_operator
.insert_pipeline(name, content_type, pipeline, query_ctx)
.await
.context(PipelineSnafu)
}
async fn delete_pipeline(&self, _name: &str, _query_ctx: QueryContextRef) -> ServerResult<()> {
// TODO(qtang): impl delete
Err(UnsupportedDeletePipelineSnafu {}.build())
}
}
impl Instance {
pub async fn handle_log_inserts(
&self,
log: RowInsertRequests,
ctx: QueryContextRef,
) -> ServerResult<Output> {
self.inserter
.handle_log_inserts(log, ctx, self.statement_executor.as_ref())
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)
}
}

View File

@@ -90,6 +90,8 @@ where
Some(self.instance.clone()),
);
builder = builder.with_log_ingest_handler(self.instance.clone());
if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
builder = builder.with_user_provider(user_provider);
}

View File

@@ -66,6 +66,22 @@ pub struct Inserter {
pub type InserterRef = Arc<Inserter>;
enum AutoCreateTableType {
Logical(String),
Physical,
Log,
}
impl AutoCreateTableType {
fn as_str(&self) -> &'static str {
match self {
AutoCreateTableType::Logical(_) => "logical",
AutoCreateTableType::Physical => "physical",
AutoCreateTableType::Log => "log",
}
}
}
impl Inserter {
pub fn new(
catalog_manager: CatalogManagerRef,
@@ -108,7 +124,42 @@ impl Inserter {
validate_column_count_match(&requests)?;
let table_name_to_ids = self
.create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor)
.create_or_alter_tables_on_demand(
&requests,
&ctx,
AutoCreateTableType::Physical,
statement_executor,
)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref())
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
}
pub async fn handle_log_inserts(
&self,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
// remove empty requests
requests.inserts.retain(|req| {
req.rows
.as_ref()
.map(|r| !r.rows.is_empty())
.unwrap_or_default()
});
validate_column_count_match(&requests)?;
let table_name_to_ids = self
.create_or_alter_tables_on_demand(
&requests,
&ctx,
AutoCreateTableType::Log,
statement_executor,
)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref())
.convert(requests)
@@ -143,7 +194,7 @@ impl Inserter {
.create_or_alter_tables_on_demand(
&requests,
&ctx,
Some(physical_table.to_string()),
AutoCreateTableType::Logical(physical_table.to_string()),
statement_executor,
)
.await?;
@@ -380,12 +431,15 @@ impl Inserter {
&self,
requests: &RowInsertRequests,
ctx: &QueryContextRef,
on_physical_table: Option<String>,
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
) -> Result<HashMap<String, TableId>> {
let mut table_name_to_ids = HashMap::with_capacity(requests.inserts.len());
let mut create_tables = vec![];
let mut alter_tables = vec![];
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
.with_label_values(&[auto_create_table_type.as_str()])
.start_timer();
for req in &requests.inserts {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
@@ -407,42 +461,56 @@ impl Inserter {
}
}
if let Some(on_physical_table) = on_physical_table {
if !create_tables.is_empty() {
// Creates logical tables in batch.
let tables = self
.create_logical_tables(
create_tables,
ctx,
&on_physical_table,
statement_executor,
)
.await?;
match auto_create_table_type {
AutoCreateTableType::Logical(on_physical_table) => {
if !create_tables.is_empty() {
// Creates logical tables in batch.
let tables = self
.create_logical_tables(
create_tables,
ctx,
&on_physical_table,
statement_executor,
)
.await?;
for table in tables {
for table in tables {
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
}
if !alter_tables.is_empty() {
// Alter logical tables in batch.
statement_executor
.alter_logical_tables(alter_tables, ctx.clone())
.await?;
}
}
AutoCreateTableType::Physical => {
for req in create_tables {
let table = self.create_table(req, ctx, statement_executor).await?;
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
.alter_table_inner(alter_expr, ctx.clone())
.await?;
}
}
if !alter_tables.is_empty() {
// Alter logical tables in batch.
statement_executor
.alter_logical_tables(alter_tables, ctx.clone())
.await?;
}
} else {
for req in create_tables {
let table = self.create_table(req, ctx, statement_executor).await?;
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
.alter_table_inner(alter_expr, ctx.clone())
.await?;
AutoCreateTableType::Log => {
for req in create_tables {
let table = self.create_log_table(req, ctx, statement_executor).await?;
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
.alter_table_inner(alter_expr, ctx.clone())
.await?;
}
}
}
Ok(table_name_to_ids)
}
@@ -568,17 +636,45 @@ impl Inserter {
match res {
Ok(table) => {
info!(
"Successfully created table {}.{}.{}",
table_ref.catalog, table_ref.schema, table_ref.table,
);
info!("Successfully created table {}", table_ref,);
Ok(table)
}
Err(err) => {
error!(
"Failed to create table {}.{}.{}: {}",
table_ref.catalog, table_ref.schema, table_ref.table, err
);
error!(err; "Failed to create table {}", table_ref);
Err(err)
}
}
}
async fn create_log_table(
&self,
req: &RowInsertRequest,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);
// SAFETY: `req.rows` is guaranteed to be `Some` by `handle_log_inserts`.
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;
info!("Table `{table_ref}` does not exist, try creating the log table");
// Set append_mode to true for log table.
// because log tables should keep rows with the same ts and tags.
create_table_expr
.table_options
.insert("append_mode".to_string(), "true".to_string());
let res = statement_executor
.create_table_inner(create_table_expr, None, ctx.clone())
.await;
match res {
Ok(table) => {
info!("Successfully created a log table {}", table_ref);
Ok(table)
}
Err(err) => {
error!(err; "Failed to create a log table {}", table_ref);
Err(err)
}
}

View File

@@ -51,4 +51,10 @@ lazy_static! {
"DDL operator create view"
)
.unwrap();
pub static ref CREATE_ALTER_ON_DEMAND: HistogramVec = register_histogram_vec!(
"greptime_table_operator_create_alter_on_demand",
"table operator duration to create or alter tables on demand",
&["table_type"]
)
.unwrap();
}

View File

@@ -73,6 +73,8 @@ pub struct StatementExecutor {
inserter: InserterRef,
}
pub type StatementExecutorRef = Arc<StatementExecutor>;
impl StatementExecutor {
pub fn new(
catalog_manager: CatalogManagerRef,

62
src/pipeline/Cargo.toml Normal file
View File

@@ -0,0 +1,62 @@
[package]
name = "pipeline"
edition.workspace = true
version.workspace = true
license.workspace = true
[features]
[lints]
workspace = true
[dependencies]
api.workspace = true
arrow.workspace = true
async-trait.workspace = true
catalog.workspace = true
chrono.workspace = true
chrono-tz = "0.9.0"
common-catalog.workspace = true
common-error.workspace = true
common-function.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
crossbeam-utils.workspace = true
csv = "1.3.0"
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-functions.workspace = true
datafusion-physical-expr.workspace = true
datatypes.workspace = true
futures.workspace = true
greptime-proto.workspace = true
itertools.workspace = true
lazy_static.workspace = true
moka = { workspace = true, features = ["sync"] }
once_cell.workspace = true
operator.workspace = true
paste.workspace = true
prometheus.workspace = true
query.workspace = true
regex.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
sql.workspace = true
table.workspace = true
tokio.workspace = true
urlencoding = "2.1"
yaml-rust = "0.4"
[dev-dependencies]
catalog = { workspace = true, features = ["testing"] }
rayon = "1.0"
ron = "0.7"
serde = { version = "1.0", features = ["derive"] }
session = { workspace = true, features = ["testing"] }

21
src/pipeline/src/lib.rs Normal file
View File

@@ -0,0 +1,21 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod etl;
mod manager;
pub use etl::transform::GreptimeTransformer;
pub use etl::value::Value;
pub use etl::{parse, Content, Pipeline};
pub use manager::{error, pipeline_operator, table};

View File

@@ -0,0 +1,129 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::timestamp::TimestampNanosecond;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Pipeline table not found"))]
PipelineTableNotFound {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to insert pipeline to pipelines table"))]
InsertPipeline {
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse pipeline: {}", reason))]
CompilePipeline {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Pipeline not found, name: {}, version: {}", name, version.map(|ts| ts.0.to_iso8601_string()).unwrap_or("latest".to_string())))]
PipelineNotFound {
name: String,
version: Option<TimestampNanosecond>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to collect record batch"))]
CollectRecords {
#[snafu(implicit)]
location: Location,
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to cast type, msg: {}", msg))]
CastType {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build DataFusion logical plan"))]
BuildDfLogicalPlan {
#[snafu(source)]
error: datafusion_common::DataFusionError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to execute internal statement"))]
ExecuteInternalStatement {
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("General catalog error"))]
Catalog {
source: catalog::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to create table"))]
CreateTable {
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to execute pipeline, reason: {}", reason))]
PipelineTransform {
reason: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
CastType { .. } => StatusCode::Unexpected,
PipelineTableNotFound { .. } => StatusCode::TableNotFound,
InsertPipeline { source, .. } => source.status_code(),
CollectRecords { source, .. } => source.status_code(),
PipelineNotFound { .. } | CompilePipeline { .. } | PipelineTransform { .. } => {
StatusCode::InvalidArguments
}
BuildDfLogicalPlan { .. } => StatusCode::Internal,
ExecuteInternalStatement { source, .. } => source.status_code(),
Catalog { source, .. } => source.status_code(),
CreateTable { source, .. } => source.status_code(),
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod error;
pub mod pipeline_operator;
pub mod table;

View File

@@ -0,0 +1,211 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use api::v1::CreateTableExpr;
use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_telemetry::info;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::TableRef;
use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result};
use crate::table::{PipelineTable, PipelineTableRef, PipelineVersion};
use crate::{GreptimeTransformer, Pipeline};
pub const PIPELINE_TABLE_NAME: &str = "pipelines";
/// PipelineOperator is responsible for managing pipelines.
/// It provides the ability to:
/// - Create a pipeline table if it does not exist
/// - Get a pipeline from the pipeline table
/// - Insert a pipeline into the pipeline table
/// - Compile a pipeline
/// - Add a pipeline table to the cache
/// - Get a pipeline table from the cache
pub struct PipelineOperator {
inserter: InserterRef,
statement_executor: StatementExecutorRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
tables: RwLock<HashMap<String, PipelineTableRef>>,
}
impl PipelineOperator {
/// Create a table request for the pipeline table.
pub fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest {
let (time_index, primary_keys, column_defs) = PipelineTable::build_pipeline_schema();
let create_table_expr = CreateTableExpr {
catalog_name: catalog.to_string(),
schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
table_name: PIPELINE_TABLE_NAME.to_string(),
desc: "GreptimeDB pipeline table for Log".to_string(),
column_defs,
time_index,
primary_keys,
create_if_not_exists: true,
table_options: Default::default(),
table_id: None, // Should and will be assigned by Meta.
engine: default_engine().to_string(),
};
RegisterSystemTableRequest {
create_table_expr,
open_hook: None,
}
}
fn add_pipeline_table_to_cache(&self, catalog: &str, table: TableRef) {
let mut tables = self.tables.write().unwrap();
if tables.contains_key(catalog) {
return;
}
tables.insert(
catalog.to_string(),
Arc::new(PipelineTable::new(
self.inserter.clone(),
self.statement_executor.clone(),
table,
self.query_engine.clone(),
)),
);
}
async fn create_pipeline_table_if_not_exists(&self, ctx: QueryContextRef) -> Result<()> {
let catalog = ctx.current_catalog();
// exist in cache
if self.get_pipeline_table_from_cache(catalog).is_some() {
return Ok(());
}
let RegisterSystemTableRequest {
create_table_expr: mut expr,
open_hook: _,
} = self.create_table_request(catalog);
// exist in catalog, just open
if let Some(table) = self
.catalog_manager
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
.await
.context(CatalogSnafu)?
{
self.add_pipeline_table_to_cache(catalog, table);
return Ok(());
}
// create table
self.statement_executor
.create_table_inner(&mut expr, None, ctx.clone())
.await
.context(CreateTableSnafu)?;
let schema = &expr.schema_name;
let table_name = &expr.table_name;
// get from catalog
let table = self
.catalog_manager
.table(catalog, schema, table_name)
.await
.context(CatalogSnafu)?
.context(PipelineTableNotFoundSnafu)?;
info!(
"Created pipelines table {} with table id {}.",
table.table_info().full_table_name(),
table.table_info().table_id()
);
// put to cache
self.add_pipeline_table_to_cache(catalog, table);
Ok(())
}
/// Get a pipeline table from the cache.
pub fn get_pipeline_table_from_cache(&self, catalog: &str) -> Option<PipelineTableRef> {
self.tables.read().unwrap().get(catalog).cloned()
}
async fn insert_and_compile(
&self,
ctx: QueryContextRef,
name: &str,
content_type: &str,
pipeline: &str,
) -> Result<Arc<Pipeline<GreptimeTransformer>>> {
self.get_pipeline_table_from_cache(ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.insert_and_compile(ctx.current_schema(), name, content_type, pipeline)
.await
}
}
impl PipelineOperator {
/// Create a new PipelineOperator.
pub fn new(
inserter: InserterRef,
statement_executor: StatementExecutorRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
) -> Self {
Self {
inserter,
statement_executor,
catalog_manager,
tables: RwLock::new(HashMap::new()),
query_engine,
}
}
/// Get a pipeline from the pipeline table.
pub async fn get_pipeline(
&self,
query_ctx: QueryContextRef,
name: &str,
version: PipelineVersion,
) -> Result<Arc<Pipeline<GreptimeTransformer>>> {
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.get_pipeline(query_ctx.current_schema(), name, version)
.await
}
/// Insert a pipeline into the pipeline table.
pub async fn insert_pipeline(
&self,
name: &str,
content_type: &str,
pipeline: &str,
query_ctx: QueryContextRef,
) -> Result<()> {
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;
self.insert_and_compile(query_ctx, name, content_type, pipeline)
.await
.map(|_| ())
}
}

View File

@@ -0,0 +1,444 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
RowInsertRequests, Rows, SemanticType,
};
use common_query::OutputData;
use common_recordbatch::util as record_util;
use common_telemetry::{debug, info};
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::datasource::DefaultTableSource;
use datafusion::logical_expr::{and, col, lit};
use datafusion_common::TableReference;
use datafusion_expr::LogicalPlanBuilder;
use datatypes::prelude::ScalarVector;
use datatypes::timestamp::TimestampNanosecond;
use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
use moka::sync::Cache;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::plan::LogicalPlan;
use query::QueryEngineRef;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableInfo;
use table::table::adapter::DfTableProviderAdapter;
use table::TableRef;
use crate::error::{
BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, CompilePipelineSnafu,
ExecuteInternalStatementSnafu, InsertPipelineSnafu, PipelineNotFoundSnafu, Result,
};
use crate::etl::transform::GreptimeTransformer;
use crate::etl::{parse, Content, Pipeline};
/// Pipeline version. An optional timestamp with nanosecond precision.
/// If the version is None, it means the latest version of the pipeline.
/// User can specify the version by providing a timestamp string formatted as iso8601.
/// When it used in cache key, it will be converted to i64 meaning the number of nanoseconds since the epoch.
pub type PipelineVersion = Option<TimestampNanosecond>;
pub type PipelineTableRef = Arc<PipelineTable>;
pub const PIPELINE_TABLE_NAME: &str = "pipelines";
pub const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
pub const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
pub const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
pub const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
pub const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
/// Pipeline table cache size.
pub const PIPELINES_CACHE_SIZE: u64 = 10000;
/// Pipeline table cache time to live.
pub const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
/// PipelineTable is a table that stores the pipeline schema and content.
/// Every catalog has its own pipeline table.
pub struct PipelineTable {
inserter: InserterRef,
statement_executor: StatementExecutorRef,
table: TableRef,
query_engine: QueryEngineRef,
pipelines: Cache<String, Arc<Pipeline<GreptimeTransformer>>>,
}
impl PipelineTable {
/// Create a new PipelineTable.
pub fn new(
inserter: InserterRef,
statement_executor: StatementExecutorRef,
table: TableRef,
query_engine: QueryEngineRef,
) -> Self {
Self {
inserter,
statement_executor,
table,
query_engine,
pipelines: Cache::builder()
.max_capacity(PIPELINES_CACHE_SIZE)
.time_to_live(PIPELINES_CACHE_TTL)
.build(),
}
}
/// Build the schema for the pipeline table.
/// Returns the (time index, primary keys, column) definitions.
pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
(
PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
vec![
PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
],
vec![
ColumnDef {
name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Tag as i32,
comment: "".to_string(),
datatype_extension: None,
},
ColumnDef {
name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Tag as i32,
comment: "".to_string(),
datatype_extension: None,
},
ColumnDef {
name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Tag as i32,
comment: "".to_string(),
datatype_extension: None,
},
ColumnDef {
name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
comment: "".to_string(),
datatype_extension: None,
},
ColumnDef {
name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
data_type: ColumnDataType::TimestampNanosecond as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Timestamp as i32,
comment: "".to_string(),
datatype_extension: None,
},
],
)
}
/// Build the column schemas for inserting a row into the pipeline table.
fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
vec![
PbColumnSchema {
column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
..Default::default()
},
PbColumnSchema {
column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
..Default::default()
},
PbColumnSchema {
column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
..Default::default()
},
PbColumnSchema {
column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
PbColumnSchema {
column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
..Default::default()
},
]
}
fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
QueryContextBuilder::default()
.current_catalog(table_info.catalog_name.to_string())
.current_schema(table_info.schema_name.to_string())
.build()
.into()
}
/// Compile a pipeline from a string.
pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline<GreptimeTransformer>> {
let yaml_content = Content::Yaml(pipeline.into());
parse::<GreptimeTransformer>(&yaml_content)
.map_err(|e| CompilePipelineSnafu { reason: e }.build())
}
fn generate_pipeline_cache_key(schema: &str, name: &str, version: PipelineVersion) -> String {
match version {
Some(version) => format!("{}/{}/{}", schema, name, i64::from(version)),
None => format!("{}/{}/latest", schema, name),
}
}
fn get_compiled_pipeline_from_cache(
&self,
schema: &str,
name: &str,
version: PipelineVersion,
) -> Option<Arc<Pipeline<GreptimeTransformer>>> {
self.pipelines
.get(&Self::generate_pipeline_cache_key(schema, name, version))
}
/// Insert a pipeline into the pipeline table.
async fn insert_pipeline_to_pipeline_table(
&self,
schema: &str,
name: &str,
content_type: &str,
pipeline: &str,
) -> Result<Timestamp> {
let now = Timestamp::current_time(TimeUnit::Nanosecond);
let table_info = self.table.table_info();
let insert = RowInsertRequest {
table_name: PIPELINE_TABLE_NAME.to_string(),
rows: Some(Rows {
schema: Self::build_insert_column_schemas(),
rows: vec![Row {
values: vec![
ValueData::StringValue(name.to_string()).into(),
ValueData::StringValue(schema.to_string()).into(),
ValueData::StringValue(content_type.to_string()).into(),
ValueData::StringValue(pipeline.to_string()).into(),
ValueData::TimestampNanosecondValue(now.value()).into(),
],
}],
}),
};
let requests = RowInsertRequests {
inserts: vec![insert],
};
let output = self
.inserter
.handle_row_inserts(
requests,
Self::query_ctx(&table_info),
&self.statement_executor,
)
.await
.context(InsertPipelineSnafu)?;
info!(
"Inserted pipeline: {} into {} table: {}, output: {:?}.",
name,
PIPELINE_TABLE_NAME,
table_info.full_table_name(),
output
);
Ok(now)
}
/// Get a pipeline by name.
/// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
pub async fn get_pipeline(
&self,
schema: &str,
name: &str,
version: PipelineVersion,
) -> Result<Arc<Pipeline<GreptimeTransformer>>> {
if let Some(pipeline) = self.get_compiled_pipeline_from_cache(schema, name, version) {
return Ok(pipeline);
}
let pipeline = self.find_pipeline_by_name(schema, name, version).await?;
let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
self.pipelines.insert(
Self::generate_pipeline_cache_key(schema, name, version),
compiled_pipeline.clone(),
);
Ok(compiled_pipeline)
}
/// Insert a pipeline into the pipeline table and compile it.
/// The compiled pipeline will be inserted into the cache.
pub async fn insert_and_compile(
&self,
schema: &str,
name: &str,
content_type: &str,
pipeline: &str,
) -> Result<Arc<Pipeline<GreptimeTransformer>>> {
let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
// we will use the version in the future
let version = self
.insert_pipeline_to_pipeline_table(schema, name, content_type, pipeline)
.await?;
{
self.pipelines.insert(
Self::generate_pipeline_cache_key(schema, name, None),
compiled_pipeline.clone(),
);
self.pipelines.insert(
Self::generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))),
compiled_pipeline.clone(),
);
}
Ok(compiled_pipeline)
}
async fn find_pipeline_by_name(
&self,
schema: &str,
name: &str,
version: PipelineVersion,
) -> Result<(String, TimestampNanosecond)> {
let table_info = self.table.table_info();
let table_name = TableReference::full(
table_info.catalog_name.clone(),
table_info.schema_name.clone(),
table_info.name.clone(),
);
let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone()));
let table_source = Arc::new(DefaultTableSource::new(table_provider));
let schema_and_name_filter = and(
col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)),
col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name)),
);
let filter = if let Some(v) = version {
and(
schema_and_name_filter,
col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string())),
)
} else {
schema_and_name_filter
};
let plan = LogicalPlanBuilder::scan(table_name, table_source, None)
.context(BuildDfLogicalPlanSnafu)?
.filter(filter)
.context(BuildDfLogicalPlanSnafu)?
.project(vec![
col(PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME),
col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME),
])
.context(BuildDfLogicalPlanSnafu)?
.sort(vec![
col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true)
])
.context(BuildDfLogicalPlanSnafu)?
.limit(0, Some(1))
.context(BuildDfLogicalPlanSnafu)?
.build()
.context(BuildDfLogicalPlanSnafu)?;
debug!("find_pipeline_by_name: plan: {:?}", plan);
let output = self
.query_engine
.execute(LogicalPlan::DfPlan(plan), Self::query_ctx(&table_info))
.await
.context(ExecuteInternalStatementSnafu)?;
let stream = match output.data {
OutputData::Stream(stream) => stream,
OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
_ => unreachable!(),
};
let records = record_util::collect(stream)
.await
.context(CollectRecordsSnafu)?;
ensure!(!records.is_empty(), PipelineNotFoundSnafu { name, version });
ensure!(
records.len() == 1 && records[0].num_columns() == 2,
PipelineNotFoundSnafu { name, version }
);
let pipeline_content_column = records[0].column(0);
let pipeline_content = pipeline_content_column
.as_any()
.downcast_ref::<StringVector>()
.with_context(|| CastTypeSnafu {
msg: format!(
"can't downcast {:?} array into string vector",
pipeline_content_column.data_type()
),
})?;
let pipeline_created_at_column = records[0].column(1);
let pipeline_created_at = pipeline_created_at_column
.as_any()
.downcast_ref::<TimestampNanosecondVector>()
.with_context(|| CastTypeSnafu {
msg: format!(
"can't downcast {:?} array into scalar vector",
pipeline_created_at_column.data_type()
),
})?;
debug!(
"find_pipeline_by_name: pipeline_content: {:?}, pipeline_created_at: {:?}",
pipeline_content, pipeline_created_at
);
ensure!(
pipeline_content.len() == 1,
PipelineNotFoundSnafu { name, version }
);
// Safety: asserted above
Ok((
pipeline_content.get_data(0).unwrap().to_string(),
pipeline_created_at.get_data(0).unwrap(),
))
}
}

View File

@@ -22,7 +22,7 @@ arrow-ipc.workspace = true
arrow-schema.workspace = true
async-trait = "0.1"
auth.workspace = true
axum.workspace = true
axum = { workspace = true, features = ["multipart"] }
axum-macros = "0.3.8"
base64.workspace = true
bytes.workspace = true
@@ -69,6 +69,7 @@ opentelemetry-proto.workspace = true
parking_lot = "0.12"
pgwire = "0.20"
pin-project = "1.0"
pipeline.workspace = true
postgres-types = { version = "0.2", features = ["with-chrono-0_4"] }
pprof = { version = "0.13", features = [
"flamegraph",

View File

@@ -27,6 +27,7 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_telemetry::{debug, error};
use datatypes::prelude::ConcreteDataType;
use headers::ContentType;
use query::parser::PromQuery;
use serde_json::json;
use snafu::{Location, Snafu};
@@ -148,6 +149,19 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Pipeline management api error"))]
Pipeline {
source: pipeline::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported delete pipeline."))]
UnsupportedDeletePipeline {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to execute script by name: {}", name))]
ExecuteScript {
name: String,
@@ -533,6 +547,27 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse payload as json"))]
ParseJson {
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to convert to structured log"))]
ToStructuredLog {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported content type: {:?}", content_type))]
UnsupportedContentType {
content_type: ContentType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decode url"))]
UrlDecode {
#[snafu(source)]
@@ -600,6 +635,7 @@ impl ErrorExt for Error {
| FileWatch { .. } => StatusCode::Internal,
UnsupportedDataType { .. } => StatusCode::Unsupported,
UnsupportedDeletePipeline { .. } => StatusCode::Unsupported,
#[cfg(not(windows))]
UpdateJemallocMetrics { .. } => StatusCode::Internal,
@@ -614,6 +650,8 @@ impl ErrorExt for Error {
| ExecuteGrpcRequest { source, .. }
| CheckDatabaseValidity { source, .. } => source.status_code(),
Pipeline { source, .. } => source.status_code(),
NotSupported { .. }
| InvalidParameter { .. }
| InvalidQuery { .. }
@@ -637,6 +675,9 @@ impl ErrorExt for Error {
| MissingQueryContext { .. }
| MysqlValueConversion { .. }
| UnexpectedPhysicalTable { .. }
| ParseJson { .. }
| ToStructuredLog { .. }
| UnsupportedContentType { .. }
| TimestampOverflow { .. } => StatusCode::InvalidArguments,
RowWriter { source, .. }

View File

@@ -67,12 +67,13 @@ use crate::metrics_handler::MetricsHandler;
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::query_handler::{
InfluxdbLineProtocolHandlerRef, OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef,
PromStoreProtocolHandlerRef, ScriptHandlerRef,
InfluxdbLineProtocolHandlerRef, LogHandlerRef, OpenTelemetryProtocolHandlerRef,
OpentsdbProtocolHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef,
};
use crate::server::Server;
pub mod authorize;
pub mod event;
pub mod handler;
pub mod header;
pub mod influxdb;
@@ -587,6 +588,16 @@ impl HttpServerBuilder {
}
}
pub fn with_log_ingest_handler(self, handler: LogHandlerRef) -> Self {
Self {
router: self.router.nest(
&format!("/{HTTP_API_VERSION}/events"),
HttpServer::route_log(handler),
),
..self
}
}
pub fn with_plugins(self, plugins: Plugins) -> Self {
Self { plugins, ..self }
}
@@ -699,6 +710,21 @@ impl HttpServer {
.with_state(metrics_handler)
}
fn route_log<S>(log_handler: LogHandlerRef) -> Router<S> {
Router::new()
.route("/logs", routing::post(event::log_ingester))
.route(
"/pipelines/:pipeline_name",
routing::post(event::add_pipeline),
)
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.layer(RequestDecompressionLayer::new()),
)
.with_state(log_handler)
}
fn route_sql<S>(api_state: ApiState) -> ApiRouter<S> {
ApiRouter::new()
.api_route(

View File

@@ -0,0 +1,257 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::result::Result as StdResult;
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
use axum::body::HttpBody;
use axum::extract::{FromRequest, Multipart, Path, Query, State};
use axum::headers::ContentType;
use axum::http::header::CONTENT_TYPE;
use axum::http::{Request, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::{async_trait, BoxError, Extension, TypedHeader};
use common_telemetry::{error, warn};
use common_time::Timestamp;
use datatypes::timestamp::TimestampNanosecond;
use mime_guess::mime;
use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu};
use pipeline::table::PipelineVersion;
use pipeline::Value as PipelineValue;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{Deserializer, Value};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use crate::error::{
InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::HttpResponse;
use crate::query_handler::LogHandlerRef;
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct LogIngesterQueryParams {
pub table: Option<String>,
pub db: Option<String>,
pub pipeline_name: Option<String>,
pub ignore_errors: Option<bool>,
pub version: Option<String>,
}
pub struct PipelineContent(String);
#[async_trait]
impl<S, B> FromRequest<S, B> for PipelineContent
where
B: HttpBody + Send + 'static,
B::Data: Send,
bytes::Bytes: std::convert::From<<B as HttpBody>::Data>,
B::Error: Into<BoxError>,
S: Send + Sync,
{
type Rejection = Response;
async fn from_request(req: Request<B>, state: &S) -> StdResult<Self, Self::Rejection> {
let content_type_header = req.headers().get(CONTENT_TYPE);
let content_type = content_type_header.and_then(|value| value.to_str().ok());
if let Some(content_type) = content_type {
if content_type.ends_with("yaml") {
let payload = String::from_request(req, state)
.await
.map_err(IntoResponse::into_response)?;
return Ok(Self(payload));
}
if content_type.starts_with("multipart/form-data") {
let mut payload: Multipart = Multipart::from_request(req, state)
.await
.map_err(IntoResponse::into_response)?;
let file = payload
.next_field()
.await
.map_err(IntoResponse::into_response)?;
let payload = file
.ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
.text()
.await
.map_err(IntoResponse::into_response)?;
return Ok(Self(payload));
}
}
Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
}
}
#[axum_macros::debug_handler]
pub async fn add_pipeline(
State(handler): State<LogHandlerRef>,
Path(pipeline_name): Path<String>,
Extension(query_ctx): Extension<QueryContextRef>,
PipelineContent(payload): PipelineContent,
) -> Result<String> {
if pipeline_name.is_empty() {
return Err(InvalidParameterSnafu {
reason: "pipeline_name is required in path",
}
.build());
}
if payload.is_empty() {
return Err(InvalidParameterSnafu {
reason: "pipeline is required in body",
}
.build());
}
let content_type = "yaml";
let result = handler
.insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
.await;
result.map(|_| "ok".to_string()).map_err(|e| {
error!(e; "failed to insert pipeline");
e
})
}
/// Transform NDJSON array into a single array
fn transform_ndjson_array_factory(
values: impl IntoIterator<Item = StdResult<Value, serde_json::Error>>,
ignore_error: bool,
) -> Result<Value> {
values.into_iter().try_fold(
Value::Array(Vec::with_capacity(100)),
|acc, item| match acc {
Value::Array(mut acc_array) => {
if let Ok(item_value) = item {
match item_value {
Value::Array(item_array) => {
acc_array.extend(item_array);
}
Value::Object(_) => {
acc_array.push(item_value);
}
_ => {
if !ignore_error {
warn!("invalid item in array: {:?}", item_value);
return InvalidParameterSnafu {
reason: format!("invalid item:{} in array", item_value),
}
.fail();
}
}
}
Ok(Value::Array(acc_array))
} else if !ignore_error {
item.context(ParseJsonSnafu)
} else {
warn!("invalid item in array: {:?}", item);
Ok(Value::Array(acc_array))
}
}
_ => unreachable!("invalid acc: {:?}", acc),
},
)
}
#[axum_macros::debug_handler]
pub async fn log_ingester(
State(handler): State<LogHandlerRef>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(query_ctx): Extension<QueryContextRef>,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
) -> Result<HttpResponse> {
let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;
let table_name = query_params.table.context(InvalidParameterSnafu {
reason: "table is required",
})?;
let version = match query_params.version {
Some(version) => {
let ts = Timestamp::from_str_utc(&version).map_err(|e| {
InvalidParameterSnafu {
reason: format!("invalid pipeline version: {} with error: {}", &version, e),
}
.build()
})?;
Some(TimestampNanosecond(ts))
}
None => None,
};
let ignore_errors = query_params.ignore_errors.unwrap_or(false);
let m: mime::Mime = content_type.clone().into();
let value = match m.subtype() {
mime::JSON => transform_ndjson_array_factory(
Deserializer::from_str(&payload).into_iter(),
ignore_errors,
)?,
// add more content type support
_ => UnsupportedContentTypeSnafu { content_type }.fail()?,
};
ingest_logs_inner(
handler,
pipeline_name,
version,
table_name,
value,
query_ctx,
)
.await
}
async fn ingest_logs_inner(
state: LogHandlerRef,
pipeline_name: String,
version: PipelineVersion,
table_name: String,
payload: Value,
query_ctx: QueryContextRef,
) -> Result<HttpResponse> {
let start = std::time::Instant::now();
let pipeline_data = PipelineValue::try_from(payload)
.map_err(|reason| CastTypeSnafu { msg: reason }.build())
.context(PipelineSnafu)?;
let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;
let transformed_data: Rows = pipeline
.exec(pipeline_data)
.map_err(|reason| PipelineTransformSnafu { reason }.build())
.context(PipelineSnafu)?;
let insert_request = RowInsertRequest {
rows: Some(transformed_data),
table_name: table_name.clone(),
};
let insert_requests = RowInsertRequests {
inserts: vec![insert_request],
};
let output = state.insert_logs(insert_requests, query_ctx).await;
let response = GreptimedbV1Response::from_output(vec![output])
.await
.with_execution_time(start.elapsed().as_millis() as u64);
Ok(response)
}

View File

@@ -35,6 +35,8 @@ use common_query::Output;
use headers::HeaderValue;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::table::PipelineVersion;
use pipeline::{GreptimeTransformer, Pipeline};
use serde_json::Value;
use session::context::QueryContextRef;
@@ -48,6 +50,7 @@ pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler +
pub type PromStoreProtocolHandlerRef = Arc<dyn PromStoreProtocolHandler + Send + Sync>;
pub type OpenTelemetryProtocolHandlerRef = Arc<dyn OpenTelemetryProtocolHandler + Send + Sync>;
pub type ScriptHandlerRef = Arc<dyn ScriptHandler + Send + Sync>;
pub type LogHandlerRef = Arc<dyn LogHandler + Send + Sync>;
#[async_trait]
pub trait ScriptHandler {
@@ -118,3 +121,29 @@ pub trait OpenTelemetryProtocolHandler {
ctx: QueryContextRef,
) -> Result<Output>;
}
/// LogHandler is responsible for handling log related requests.
/// It should be able to insert logs and manage pipelines.
/// The pipeline is a series of transformations that can be applied to logs.
/// The pipeline is stored in the database and can be retrieved by name.
#[async_trait]
pub trait LogHandler {
async fn insert_logs(&self, log: RowInsertRequests, ctx: QueryContextRef) -> Result<Output>;
async fn get_pipeline(
&self,
name: &str,
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<Arc<Pipeline<GreptimeTransformer>>>;
async fn insert_pipeline(
&self,
name: &str,
content_type: &str,
pipeline: &str,
query_ctx: QueryContextRef,
) -> Result<()>;
async fn delete_pipeline(&self, name: &str, query_ctx: QueryContextRef) -> Result<()>;
}