mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-24 07:00:00 +00:00
Compare commits
1 Commits
async_deco
...
v0.13.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
557e850d87 |
24
Cargo.lock
generated
24
Cargo.lock
generated
@@ -4119,12 +4119,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.1.0"
|
||||
version = "1.0.34"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc"
|
||||
checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0"
|
||||
dependencies = [
|
||||
"crc32fast",
|
||||
"libz-rs-sys",
|
||||
"libz-sys",
|
||||
"miniz_oxide",
|
||||
]
|
||||
@@ -6279,15 +6278,6 @@ dependencies = [
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libz-rs-sys"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "902bc563b5d65ad9bba616b490842ef0651066a1a1dc3ce1087113ffcb873c8d"
|
||||
dependencies = [
|
||||
"zlib-rs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libz-sys"
|
||||
version = "1.1.20"
|
||||
@@ -6832,9 +6822,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.8.5"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5"
|
||||
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
|
||||
dependencies = [
|
||||
"adler2",
|
||||
]
|
||||
@@ -13964,12 +13954,6 @@ dependencies = [
|
||||
"syn 2.0.96",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zlib-rs"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b20717f0917c908dc63de2e44e97f1e6b126ca58d0e391cee86d504eb8fbd05"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
version = "0.11.2+zstd.1.5.2"
|
||||
|
||||
@@ -126,7 +126,6 @@ deadpool-postgres = "0.12"
|
||||
derive_builder = "0.12"
|
||||
dotenv = "0.15"
|
||||
etcd-client = "0.14"
|
||||
flate2 = { version = "1.1.0", default-features = false, features = ["zlib-rs"] }
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
|
||||
@@ -1,191 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_query::logical_plan::SubstraitPlanDecoder;
|
||||
use datafusion::catalog::CatalogProviderList;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
|
||||
use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError};
|
||||
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{DataFusionSnafu, Error, QueryPlanSnafu};
|
||||
use crate::query_engine::DefaultPlanDecoder;
|
||||
|
||||
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
||||
pub struct UnexpandedNode {
|
||||
pub inner: Vec<u8>,
|
||||
pub schema: DFSchemaRef,
|
||||
}
|
||||
|
||||
impl UnexpandedNode {
|
||||
pub fn new_no_schema(inner: Vec<u8>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
schema: Arc::new(DFSchema::empty()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for UnexpandedNode {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
self.inner.partial_cmp(&other.inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl UnexpandedNode {
|
||||
const NAME: &'static str = "Unexpanded";
|
||||
}
|
||||
|
||||
impl UserDefinedLogicalNodeCore for UnexpandedNode {
|
||||
fn name(&self) -> &'static str {
|
||||
Self::NAME
|
||||
}
|
||||
|
||||
fn inputs(&self) -> Vec<&LogicalPlan> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn schema(&self) -> &DFSchemaRef {
|
||||
&self.schema
|
||||
}
|
||||
|
||||
fn with_exprs_and_inputs(
|
||||
&self,
|
||||
_: Vec<datafusion_expr::Expr>,
|
||||
_: Vec<LogicalPlan>,
|
||||
) -> datafusion_common::Result<Self> {
|
||||
Ok(self.clone())
|
||||
}
|
||||
|
||||
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{}", Self::NAME)
|
||||
}
|
||||
|
||||
fn expressions(&self) -> Vec<datafusion_expr::Expr> {
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
|
||||
/// Rewrite decoded `LogicalPlan` so all `UnexpandedNode` are expanded
|
||||
///
|
||||
/// This is a hack to support decoded substrait plan using async functions
|
||||
///
|
||||
/// Corresponding encode method should put custom logical node's input plan into `UnexpandedNode` after encoding into bytes
|
||||
pub struct UnexpandDecoder {
|
||||
pub default_decoder: DefaultPlanDecoder,
|
||||
}
|
||||
|
||||
impl UnexpandDecoder {
|
||||
pub fn new(default_decoder: DefaultPlanDecoder) -> Self {
|
||||
Self { default_decoder }
|
||||
}
|
||||
}
|
||||
|
||||
impl UnexpandDecoder {
|
||||
/// Decode substrait plan into `LogicalPlan` and recursively expand all unexpanded nodes
|
||||
///
|
||||
/// supporting async functions so our custom logical plan's input can be decoded as well
|
||||
pub async fn decode(
|
||||
&self,
|
||||
message: bytes::Bytes,
|
||||
catalog_list: Arc<dyn CatalogProviderList>,
|
||||
optimize: bool,
|
||||
) -> Result<LogicalPlan, Error> {
|
||||
let plan = self
|
||||
.default_decoder
|
||||
.decode(message, catalog_list.clone(), optimize)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryPlanSnafu)?;
|
||||
self.expand(plan, catalog_list, optimize)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryPlanSnafu)
|
||||
}
|
||||
|
||||
/// Recursively expand all unexpanded nodes in the plan
|
||||
pub async fn expand(
|
||||
&self,
|
||||
plan: LogicalPlan,
|
||||
catalog_list: Arc<dyn CatalogProviderList>,
|
||||
optimize: bool,
|
||||
) -> Result<LogicalPlan, Error> {
|
||||
let mut cur_unexpanded_node = None;
|
||||
let mut root_expanded_plan = plan.clone();
|
||||
loop {
|
||||
root_expanded_plan
|
||||
.apply(|p| {
|
||||
if let LogicalPlan::Extension(node) = p {
|
||||
if node.node.name() == UnexpandedNode::NAME {
|
||||
let node = node.node.as_any().downcast_ref::<UnexpandedNode>().ok_or(
|
||||
DataFusionError::Plan(
|
||||
"Failed to downcast to UnexpandedNode".to_string(),
|
||||
),
|
||||
)?;
|
||||
cur_unexpanded_node = Some(node.clone());
|
||||
return Ok(TreeNodeRecursion::Stop);
|
||||
}
|
||||
}
|
||||
Ok(TreeNodeRecursion::Continue)
|
||||
})
|
||||
.context(DataFusionSnafu)?;
|
||||
|
||||
if let Some(unexpanded) = cur_unexpanded_node.take() {
|
||||
let decoded = self
|
||||
.default_decoder
|
||||
.decode(
|
||||
unexpanded.inner.clone().into(),
|
||||
catalog_list.clone(),
|
||||
optimize,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryPlanSnafu)?;
|
||||
let mut decoded = Some(decoded);
|
||||
|
||||
// replace it with decoded plan
|
||||
// since if unexpanded the first node we encountered is the same node
|
||||
root_expanded_plan = root_expanded_plan
|
||||
.transform(|p| {
|
||||
let Some(decoded) = decoded.take() else {
|
||||
return Ok(Transformed::no(p));
|
||||
};
|
||||
|
||||
if let LogicalPlan::Extension(node) = &p
|
||||
&& node.node.name() == UnexpandedNode::NAME
|
||||
{
|
||||
let _ = node.node.as_any().downcast_ref::<UnexpandedNode>().ok_or(
|
||||
DataFusionError::Plan(
|
||||
"Failed to downcast to UnexpandedNode".to_string(),
|
||||
),
|
||||
)?;
|
||||
Ok(Transformed::yes(decoded))
|
||||
} else {
|
||||
Ok(Transformed::no(p))
|
||||
}
|
||||
})
|
||||
.context(DataFusionSnafu)?
|
||||
.data;
|
||||
} else {
|
||||
// all node are expanded
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(root_expanded_plan)
|
||||
}
|
||||
}
|
||||
@@ -26,7 +26,6 @@ pub mod dist_plan;
|
||||
pub mod dummy_catalog;
|
||||
pub mod error;
|
||||
pub mod executor;
|
||||
pub mod expand;
|
||||
pub mod log_query;
|
||||
pub mod metrics;
|
||||
mod optimizer;
|
||||
|
||||
@@ -12,12 +12,14 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::io::BufRead;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use api::v1::RowInsertRequests;
|
||||
use async_trait::async_trait;
|
||||
use axum::body::Bytes;
|
||||
use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
|
||||
use axum::http::header::CONTENT_TYPE;
|
||||
use axum::http::{HeaderMap, StatusCode};
|
||||
@@ -389,8 +391,8 @@ pub struct PipelineDryrunParams {
|
||||
/// Check if the payload is valid json
|
||||
/// Check if the payload contains pipeline or pipeline_name and data
|
||||
/// Return Some if valid, None if invalid
|
||||
fn check_pipeline_dryrun_params_valid(payload: &str) -> Option<PipelineDryrunParams> {
|
||||
match serde_json::from_str::<PipelineDryrunParams>(payload) {
|
||||
fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
|
||||
match serde_json::from_slice::<PipelineDryrunParams>(payload) {
|
||||
// payload with pipeline or pipeline_name and data is array
|
||||
Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
|
||||
// because of the pipeline_name or pipeline is required
|
||||
@@ -432,7 +434,7 @@ pub async fn pipeline_dryrun(
|
||||
Query(query_params): Query<LogIngesterQueryParams>,
|
||||
Extension(mut query_ctx): Extension<QueryContext>,
|
||||
TypedHeader(content_type): TypedHeader<ContentType>,
|
||||
payload: String,
|
||||
payload: Bytes,
|
||||
) -> Result<Response> {
|
||||
let handler = log_state.log_handler;
|
||||
|
||||
@@ -514,7 +516,7 @@ pub async fn log_ingester(
|
||||
Extension(mut query_ctx): Extension<QueryContext>,
|
||||
TypedHeader(content_type): TypedHeader<ContentType>,
|
||||
headers: HeaderMap,
|
||||
payload: String,
|
||||
payload: Bytes,
|
||||
) -> Result<HttpResponse> {
|
||||
// validate source and payload
|
||||
let source = query_params.source.as_deref();
|
||||
@@ -565,40 +567,45 @@ pub async fn log_ingester(
|
||||
|
||||
fn extract_pipeline_value_by_content_type(
|
||||
content_type: ContentType,
|
||||
payload: String,
|
||||
payload: Bytes,
|
||||
ignore_errors: bool,
|
||||
) -> Result<Vec<Value>> {
|
||||
Ok(match content_type {
|
||||
ct if ct == *JSON_CONTENT_TYPE => transform_ndjson_array_factory(
|
||||
Deserializer::from_str(&payload).into_iter(),
|
||||
Deserializer::from_slice(&payload).into_iter(),
|
||||
ignore_errors,
|
||||
)?,
|
||||
ct if ct == *NDJSON_CONTENT_TYPE => {
|
||||
let mut result = Vec::with_capacity(1000);
|
||||
for (index, line) in payload.lines().enumerate() {
|
||||
match serde_json::from_str(line) {
|
||||
Ok(v) => {
|
||||
result.push(v);
|
||||
}
|
||||
Err(_) => {
|
||||
if !ignore_errors {
|
||||
warn!(
|
||||
"invalid json item in array, index: {:?}, value: {:?}",
|
||||
index, line
|
||||
);
|
||||
return InvalidParameterSnafu {
|
||||
reason: format!("invalid item:{} in array", line),
|
||||
}
|
||||
.fail();
|
||||
let line = match line {
|
||||
Ok(line) if !line.is_empty() => line,
|
||||
Ok(_) => continue, // Skip empty lines
|
||||
Err(_) if ignore_errors => continue,
|
||||
Err(e) => {
|
||||
warn!(e; "invalid string at index: {}", index);
|
||||
return InvalidParameterSnafu {
|
||||
reason: format!("invalid line at index: {}", index),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
if let Ok(v) = serde_json::from_str(&line) {
|
||||
result.push(v);
|
||||
} else if !ignore_errors {
|
||||
warn!("invalid JSON at index: {}, content: {:?}", index, line);
|
||||
return InvalidParameterSnafu {
|
||||
reason: format!("invalid JSON at index: {}", index),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload
|
||||
.lines()
|
||||
.filter(|line| !line.is_empty())
|
||||
.filter_map(|line| line.ok().filter(|line| !line.is_empty()))
|
||||
.map(|line| json!({"message": line}))
|
||||
.collect(),
|
||||
_ => UnsupportedContentTypeSnafu { content_type }.fail()?,
|
||||
@@ -677,7 +684,8 @@ pub(crate) async fn ingest_logs_inner(
|
||||
pub trait LogValidator: Send + Sync {
|
||||
/// validate payload by source before processing
|
||||
/// Return a `Some` result to indicate validation failure.
|
||||
async fn validate(&self, source: Option<&str>, payload: &str) -> Option<Result<HttpResponse>>;
|
||||
async fn validate(&self, source: Option<&str>, payload: &Bytes)
|
||||
-> Option<Result<HttpResponse>>;
|
||||
}
|
||||
|
||||
pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
|
||||
@@ -731,17 +739,17 @@ mod tests {
|
||||
{"a": 1}
|
||||
{"b": 2"}
|
||||
{"c": 1}
|
||||
"#;
|
||||
"#
|
||||
.as_bytes();
|
||||
let payload = Bytes::from_static(payload);
|
||||
|
||||
let fail_rest =
|
||||
extract_pipeline_value_by_content_type(ContentType::json(), payload.to_string(), true);
|
||||
extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
|
||||
assert!(fail_rest.is_ok());
|
||||
assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1})]);
|
||||
|
||||
let fail_only_wrong = extract_pipeline_value_by_content_type(
|
||||
NDJSON_CONTENT_TYPE.clone(),
|
||||
payload.to_string(),
|
||||
true,
|
||||
);
|
||||
let fail_only_wrong =
|
||||
extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
|
||||
assert!(fail_only_wrong.is_ok());
|
||||
assert_eq!(
|
||||
fail_only_wrong.unwrap(),
|
||||
|
||||
@@ -37,16 +37,16 @@ common-telemetry.workspace = true
|
||||
common-test-util.workspace = true
|
||||
common-time.workspace = true
|
||||
common-wal.workspace = true
|
||||
datanode.workspace = true
|
||||
datanode = { workspace = true }
|
||||
datatypes.workspace = true
|
||||
dotenv.workspace = true
|
||||
flate2.workspace = true
|
||||
flate2 = "1.0"
|
||||
flow.workspace = true
|
||||
frontend = { workspace = true, features = ["testing"] }
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
hyper-util = { workspace = true, features = ["tokio"] }
|
||||
log-query.workspace = true
|
||||
log-query = { workspace = true }
|
||||
loki-proto.workspace = true
|
||||
meta-client.workspace = true
|
||||
meta-srv = { workspace = true, features = ["mock"] }
|
||||
@@ -96,5 +96,5 @@ prost.workspace = true
|
||||
rand.workspace = true
|
||||
session = { workspace = true, features = ["testing"] }
|
||||
store-api.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-postgres = { workspace = true }
|
||||
url = "2.3"
|
||||
|
||||
@@ -15,8 +15,8 @@ common-error.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-time.workspace = true
|
||||
datatypes.workspace = true
|
||||
flate2.workspace = true
|
||||
datatypes = { workspace = true }
|
||||
flate2 = "1.0"
|
||||
hex = "0.4"
|
||||
local-ip-address = "0.6"
|
||||
mysql = { version = "25.0.1", default-features = false, features = ["minimal", "rustls-tls"] }
|
||||
@@ -31,5 +31,5 @@ tar = "0.4"
|
||||
tempfile.workspace = true
|
||||
tinytemplate = "1.2"
|
||||
tokio.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-postgres = { workspace = true }
|
||||
tokio-stream.workspace = true
|
||||
|
||||
Reference in New Issue
Block a user