mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 20:32:56 +00:00
chore: shared pipeline under same catalog with compatibility (#6143)
* chore: support shared pipeline under catalog with compatibility * test: add test for cross schema ref * chore: use empty string schema by default * chore: remove unwrap in the patch * fix: df check
This commit is contained in:
@@ -650,6 +650,24 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. schemas: {}",
|
||||
schemas
|
||||
))]
|
||||
MultiPipelineWithDiffSchema {
|
||||
schemas: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"The return value's length of the record batch does not match, see debug log for details"
|
||||
))]
|
||||
RecordBatchLenNotMatch {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to collect record batch"))]
|
||||
CollectRecords {
|
||||
#[snafu(implicit)]
|
||||
@@ -750,7 +768,8 @@ impl ErrorExt for Error {
|
||||
PipelineNotFound { .. }
|
||||
| InvalidPipelineVersion { .. }
|
||||
| InvalidCustomTimeIndex { .. } => StatusCode::InvalidArguments,
|
||||
BuildDfLogicalPlan { .. } => StatusCode::Internal,
|
||||
MultiPipelineWithDiffSchema { .. } => StatusCode::IllegalState,
|
||||
BuildDfLogicalPlan { .. } | RecordBatchLenNotMatch { .. } => StatusCode::Internal,
|
||||
ExecuteInternalStatement { source, .. } => source.status_code(),
|
||||
DataFrame { source, .. } => source.status_code(),
|
||||
Catalog { source, .. } => source.status_code(),
|
||||
|
||||
@@ -29,6 +29,7 @@ use crate::etl::value::time::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RES
|
||||
use crate::table::PipelineTable;
|
||||
use crate::{GreptimePipelineParams, Pipeline, Value};
|
||||
|
||||
mod pipeline_cache;
|
||||
pub mod pipeline_operator;
|
||||
pub mod table;
|
||||
pub mod util;
|
||||
|
||||
184
src/pipeline/src/manager/pipeline_cache.rs
Normal file
184
src/pipeline/src/manager/pipeline_cache.rs
Normal file
@@ -0,0 +1,184 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use datatypes::timestamp::TimestampNanosecond;
|
||||
use moka::sync::Cache;
|
||||
|
||||
use crate::error::{MultiPipelineWithDiffSchemaSnafu, Result};
|
||||
use crate::etl::Pipeline;
|
||||
use crate::manager::PipelineVersion;
|
||||
use crate::table::EMPTY_SCHEMA_NAME;
|
||||
use crate::util::{generate_pipeline_cache_key, generate_pipeline_cache_key_suffix};
|
||||
|
||||
/// Pipeline table cache size.
|
||||
const PIPELINES_CACHE_SIZE: u64 = 10000;
|
||||
/// Pipeline table cache time to live.
|
||||
const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Pipeline cache is located on a separate file on purpose,
|
||||
/// to encapsulate inner cache. Only public methods are exposed.
|
||||
pub(crate) struct PipelineCache {
|
||||
pipelines: Cache<String, Arc<Pipeline>>,
|
||||
original_pipelines: Cache<String, (String, TimestampNanosecond)>,
|
||||
}
|
||||
|
||||
impl PipelineCache {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
pipelines: Cache::builder()
|
||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||
.time_to_live(PIPELINES_CACHE_TTL)
|
||||
.build(),
|
||||
original_pipelines: Cache::builder()
|
||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||
.time_to_live(PIPELINES_CACHE_TTL)
|
||||
.build(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn insert_pipeline_cache(
|
||||
&self,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
pipeline: Arc<Pipeline>,
|
||||
with_latest: bool,
|
||||
) {
|
||||
insert_cache_generic(
|
||||
&self.pipelines,
|
||||
schema,
|
||||
name,
|
||||
version,
|
||||
pipeline,
|
||||
with_latest,
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn insert_pipeline_str_cache(
|
||||
&self,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
pipeline: (String, TimestampNanosecond),
|
||||
with_latest: bool,
|
||||
) {
|
||||
insert_cache_generic(
|
||||
&self.original_pipelines,
|
||||
schema,
|
||||
name,
|
||||
version,
|
||||
pipeline,
|
||||
with_latest,
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn get_pipeline_cache(
|
||||
&self,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<Arc<Pipeline>>> {
|
||||
get_cache_generic(&self.pipelines, schema, name, version)
|
||||
}
|
||||
|
||||
pub(crate) fn get_pipeline_str_cache(
|
||||
&self,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<(String, TimestampNanosecond)>> {
|
||||
get_cache_generic(&self.original_pipelines, schema, name, version)
|
||||
}
|
||||
|
||||
// remove cache with version and latest in all schemas
|
||||
pub(crate) fn remove_cache(&self, name: &str, version: PipelineVersion) {
|
||||
let version_suffix = generate_pipeline_cache_key_suffix(name, version);
|
||||
let latest_suffix = generate_pipeline_cache_key_suffix(name, None);
|
||||
|
||||
let ks = self
|
||||
.pipelines
|
||||
.iter()
|
||||
.filter_map(|(k, _)| {
|
||||
if k.ends_with(&version_suffix) || k.ends_with(&latest_suffix) {
|
||||
Some(k.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for k in ks {
|
||||
let k = k.as_str();
|
||||
self.pipelines.remove(k);
|
||||
self.original_pipelines.remove(k);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_cache_generic<T: Clone + Send + Sync + 'static>(
|
||||
cache: &Cache<String, T>,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
value: T,
|
||||
with_latest: bool,
|
||||
) {
|
||||
let k = generate_pipeline_cache_key(schema, name, version);
|
||||
cache.insert(k, value.clone());
|
||||
if with_latest {
|
||||
let k = generate_pipeline_cache_key(schema, name, None);
|
||||
cache.insert(k, value);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cache_generic<T: Clone + Send + Sync + 'static>(
|
||||
cache: &Cache<String, T>,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<T>> {
|
||||
// lets try empty schema first
|
||||
let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
|
||||
if let Some(value) = cache.get(&k) {
|
||||
return Ok(Some(value));
|
||||
}
|
||||
// use input schema
|
||||
let k = generate_pipeline_cache_key(schema, name, version);
|
||||
if let Some(value) = cache.get(&k) {
|
||||
return Ok(Some(value));
|
||||
}
|
||||
|
||||
// try all schemas
|
||||
let suffix_key = generate_pipeline_cache_key_suffix(name, version);
|
||||
let mut ks = cache
|
||||
.iter()
|
||||
.filter(|e| e.0.ends_with(&suffix_key))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
match ks.len() {
|
||||
0 => Ok(None),
|
||||
1 => Ok(Some(ks.remove(0).1)),
|
||||
_ => MultiPipelineWithDiffSchemaSnafu {
|
||||
schemas: ks
|
||||
.iter()
|
||||
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
|
||||
.collect::<Vec<_>>()
|
||||
.join(","),
|
||||
}
|
||||
.fail()?,
|
||||
}
|
||||
}
|
||||
@@ -236,7 +236,7 @@ impl PipelineOperator {
|
||||
let timer = Instant::now();
|
||||
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
|
||||
.context(PipelineTableNotFoundSnafu)?
|
||||
.insert_and_compile(&query_ctx.current_schema(), name, content_type, pipeline)
|
||||
.insert_and_compile(name, content_type, pipeline)
|
||||
.inspect(|re| {
|
||||
METRIC_PIPELINE_CREATE_HISTOGRAM
|
||||
.with_label_values(&[&re.is_ok().to_string()])
|
||||
@@ -259,7 +259,7 @@ impl PipelineOperator {
|
||||
let timer = Instant::now();
|
||||
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
|
||||
.context(PipelineTableNotFoundSnafu)?
|
||||
.delete_pipeline(&query_ctx.current_schema(), name, version)
|
||||
.delete_pipeline(name, version)
|
||||
.inspect(|re| {
|
||||
METRIC_PIPELINE_DELETE_HISTOGRAM
|
||||
.with_label_values(&[&re.is_ok().to_string()])
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{
|
||||
@@ -24,13 +23,12 @@ use common_query::OutputData;
|
||||
use common_recordbatch::util as record_util;
|
||||
use common_telemetry::{debug, info};
|
||||
use common_time::timestamp::{TimeUnit, Timestamp};
|
||||
use datafusion::logical_expr::col;
|
||||
use datafusion_common::{TableReference, ToDFSchema};
|
||||
use datafusion_expr::{DmlStatement, LogicalPlan};
|
||||
use datafusion_expr::{col, DmlStatement, LogicalPlan};
|
||||
use datatypes::prelude::ScalarVector;
|
||||
use datatypes::timestamp::TimestampNanosecond;
|
||||
use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
|
||||
use moka::sync::Cache;
|
||||
use itertools::Itertools;
|
||||
use operator::insert::InserterRef;
|
||||
use operator::statement::StatementExecutorRef;
|
||||
use query::dataframe::DataFrame;
|
||||
@@ -43,23 +41,20 @@ use table::TableRef;
|
||||
use crate::error::{
|
||||
BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu,
|
||||
ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
|
||||
PipelineNotFoundSnafu, Result,
|
||||
MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
|
||||
};
|
||||
use crate::etl::{parse, Content, Pipeline};
|
||||
use crate::manager::pipeline_cache::PipelineCache;
|
||||
use crate::manager::{PipelineInfo, PipelineVersion};
|
||||
use crate::util::{generate_pipeline_cache_key, prepare_dataframe_conditions};
|
||||
use crate::util::prepare_dataframe_conditions;
|
||||
|
||||
pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
|
||||
pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
|
||||
pub(crate) const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
|
||||
const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
|
||||
const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
|
||||
const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
|
||||
pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
|
||||
|
||||
/// Pipeline table cache size.
|
||||
const PIPELINES_CACHE_SIZE: u64 = 10000;
|
||||
/// Pipeline table cache time to live.
|
||||
const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
|
||||
pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
|
||||
|
||||
/// PipelineTable is a table that stores the pipeline schema and content.
|
||||
/// Every catalog has its own pipeline table.
|
||||
@@ -68,8 +63,7 @@ pub struct PipelineTable {
|
||||
statement_executor: StatementExecutorRef,
|
||||
table: TableRef,
|
||||
query_engine: QueryEngineRef,
|
||||
pipelines: Cache<String, Arc<Pipeline>>,
|
||||
original_pipelines: Cache<String, (String, TimestampNanosecond)>,
|
||||
cache: PipelineCache,
|
||||
}
|
||||
|
||||
impl PipelineTable {
|
||||
@@ -85,14 +79,7 @@ impl PipelineTable {
|
||||
statement_executor,
|
||||
table,
|
||||
query_engine,
|
||||
pipelines: Cache::builder()
|
||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||
.time_to_live(PIPELINES_CACHE_TTL)
|
||||
.build(),
|
||||
original_pipelines: Cache::builder()
|
||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||
.time_to_live(PIPELINES_CACHE_TTL)
|
||||
.build(),
|
||||
cache: PipelineCache::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,7 +201,6 @@ impl PipelineTable {
|
||||
/// Insert a pipeline into the pipeline table.
|
||||
async fn insert_pipeline_to_pipeline_table(
|
||||
&self,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
content_type: &str,
|
||||
pipeline: &str,
|
||||
@@ -230,7 +216,7 @@ impl PipelineTable {
|
||||
rows: vec![Row {
|
||||
values: vec![
|
||||
ValueData::StringValue(name.to_string()).into(),
|
||||
ValueData::StringValue(schema.to_string()).into(),
|
||||
ValueData::StringValue(EMPTY_SCHEMA_NAME.to_string()).into(),
|
||||
ValueData::StringValue(content_type.to_string()).into(),
|
||||
ValueData::StringValue(pipeline.to_string()).into(),
|
||||
ValueData::TimestampNanosecondValue(now.value()).into(),
|
||||
@@ -272,20 +258,15 @@ impl PipelineTable {
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Arc<Pipeline>> {
|
||||
if let Some(pipeline) = self
|
||||
.pipelines
|
||||
.get(&generate_pipeline_cache_key(schema, name, version))
|
||||
{
|
||||
if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
|
||||
return Ok(pipeline);
|
||||
}
|
||||
|
||||
let pipeline = self.get_pipeline_str(schema, name, version).await?;
|
||||
let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
|
||||
|
||||
self.pipelines.insert(
|
||||
generate_pipeline_cache_key(schema, name, version),
|
||||
compiled_pipeline.clone(),
|
||||
);
|
||||
self.cache
|
||||
.insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
|
||||
Ok(compiled_pipeline)
|
||||
}
|
||||
|
||||
@@ -297,28 +278,56 @@ impl PipelineTable {
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<(String, TimestampNanosecond)> {
|
||||
if let Some(pipeline) = self
|
||||
.original_pipelines
|
||||
.get(&generate_pipeline_cache_key(schema, name, version))
|
||||
{
|
||||
if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
|
||||
return Ok(pipeline);
|
||||
}
|
||||
let pipeline = self
|
||||
.find_pipeline(schema, name, version)
|
||||
.await?
|
||||
.context(PipelineNotFoundSnafu { name, version })?;
|
||||
self.original_pipelines.insert(
|
||||
generate_pipeline_cache_key(schema, name, version),
|
||||
pipeline.clone(),
|
||||
|
||||
let mut pipeline_vec = self.find_pipeline(name, version).await?;
|
||||
ensure!(
|
||||
!pipeline_vec.is_empty(),
|
||||
PipelineNotFoundSnafu { name, version }
|
||||
);
|
||||
Ok(pipeline)
|
||||
|
||||
// if the result is exact one, use it
|
||||
if pipeline_vec.len() == 1 {
|
||||
let (pipeline_content, found_schema, version) = pipeline_vec.remove(0);
|
||||
let p = (pipeline_content, version);
|
||||
self.cache.insert_pipeline_str_cache(
|
||||
&found_schema,
|
||||
name,
|
||||
Some(version),
|
||||
p.clone(),
|
||||
false,
|
||||
);
|
||||
return Ok(p);
|
||||
}
|
||||
|
||||
// check if there's empty schema pipeline
|
||||
// if there isn't, check current schema
|
||||
let pipeline = pipeline_vec
|
||||
.iter()
|
||||
.find(|v| v.1 == EMPTY_SCHEMA_NAME)
|
||||
.or_else(|| pipeline_vec.iter().find(|v| v.1 == schema));
|
||||
|
||||
// multiple pipeline with no empty or current schema
|
||||
// throw an error
|
||||
let (pipeline_content, found_schema, version) =
|
||||
pipeline.context(MultiPipelineWithDiffSchemaSnafu {
|
||||
schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
|
||||
})?;
|
||||
|
||||
let v = *version;
|
||||
let p = (pipeline_content.clone(), v);
|
||||
self.cache
|
||||
.insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
|
||||
Ok(p)
|
||||
}
|
||||
|
||||
/// Insert a pipeline into the pipeline table and compile it.
|
||||
/// The compiled pipeline will be inserted into the cache.
|
||||
/// Newly created pipelines will be saved under empty schema.
|
||||
pub async fn insert_and_compile(
|
||||
&self,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
content_type: &str,
|
||||
pipeline: &str,
|
||||
@@ -326,26 +335,24 @@ impl PipelineTable {
|
||||
let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
|
||||
// we will use the version in the future
|
||||
let version = self
|
||||
.insert_pipeline_to_pipeline_table(schema, name, content_type, pipeline)
|
||||
.insert_pipeline_to_pipeline_table(name, content_type, pipeline)
|
||||
.await?;
|
||||
|
||||
{
|
||||
self.pipelines.insert(
|
||||
generate_pipeline_cache_key(schema, name, None),
|
||||
compiled_pipeline.clone(),
|
||||
);
|
||||
self.pipelines.insert(
|
||||
generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))),
|
||||
self.cache.insert_pipeline_cache(
|
||||
EMPTY_SCHEMA_NAME,
|
||||
name,
|
||||
Some(TimestampNanosecond(version)),
|
||||
compiled_pipeline.clone(),
|
||||
true,
|
||||
);
|
||||
|
||||
self.original_pipelines.insert(
|
||||
generate_pipeline_cache_key(schema, name, None),
|
||||
(pipeline.to_owned(), TimestampNanosecond(version)),
|
||||
);
|
||||
self.original_pipelines.insert(
|
||||
generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))),
|
||||
self.cache.insert_pipeline_str_cache(
|
||||
EMPTY_SCHEMA_NAME,
|
||||
name,
|
||||
Some(TimestampNanosecond(version)),
|
||||
(pipeline.to_owned(), TimestampNanosecond(version)),
|
||||
true,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -354,7 +361,6 @@ impl PipelineTable {
|
||||
|
||||
pub async fn delete_pipeline(
|
||||
&self,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<()>> {
|
||||
@@ -365,8 +371,8 @@ impl PipelineTable {
|
||||
);
|
||||
|
||||
// 1. check pipeline exist in catalog
|
||||
let pipeline = self.find_pipeline(schema, name, version).await?;
|
||||
if pipeline.is_none() {
|
||||
let pipeline = self.find_pipeline(name, version).await?;
|
||||
if pipeline.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -378,7 +384,7 @@ impl PipelineTable {
|
||||
let DataFrame::DataFusion(dataframe) = dataframe;
|
||||
|
||||
let dataframe = dataframe
|
||||
.filter(prepare_dataframe_conditions(schema, name, version))
|
||||
.filter(prepare_dataframe_conditions(name, version))
|
||||
.context(BuildDfLogicalPlanSnafu)?;
|
||||
|
||||
// 3. prepare dml stmt
|
||||
@@ -425,24 +431,19 @@ impl PipelineTable {
|
||||
);
|
||||
|
||||
// remove cache with version and latest
|
||||
self.pipelines
|
||||
.remove(&generate_pipeline_cache_key(schema, name, version));
|
||||
self.pipelines
|
||||
.remove(&generate_pipeline_cache_key(schema, name, None));
|
||||
self.original_pipelines
|
||||
.remove(&generate_pipeline_cache_key(schema, name, version));
|
||||
self.original_pipelines
|
||||
.remove(&generate_pipeline_cache_key(schema, name, None));
|
||||
self.cache.remove_cache(name, version);
|
||||
|
||||
Ok(Some(()))
|
||||
}
|
||||
|
||||
// find all pipelines with name and version
|
||||
// cloud be multiple with different schema
|
||||
// return format: (pipeline content, schema, created_at)
|
||||
async fn find_pipeline(
|
||||
&self,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<(String, TimestampNanosecond)>> {
|
||||
) -> Result<Vec<(String, String, TimestampNanosecond)>> {
|
||||
// 1. prepare dataframe
|
||||
let dataframe = self
|
||||
.query_engine
|
||||
@@ -450,19 +451,19 @@ impl PipelineTable {
|
||||
.context(DataFrameSnafu)?;
|
||||
let DataFrame::DataFusion(dataframe) = dataframe;
|
||||
|
||||
// select all pipelines with name and version
|
||||
let dataframe = dataframe
|
||||
.filter(prepare_dataframe_conditions(schema, name, version))
|
||||
.filter(prepare_dataframe_conditions(name, version))
|
||||
.context(BuildDfLogicalPlanSnafu)?
|
||||
.select_columns(&[
|
||||
PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
|
||||
PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME,
|
||||
PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
|
||||
])
|
||||
.context(BuildDfLogicalPlanSnafu)?
|
||||
.sort(vec![
|
||||
col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true)
|
||||
])
|
||||
.context(BuildDfLogicalPlanSnafu)?
|
||||
.limit(0, Some(1))
|
||||
.context(BuildDfLogicalPlanSnafu)?;
|
||||
|
||||
let plan = dataframe.into_parts().1;
|
||||
@@ -489,51 +490,70 @@ impl PipelineTable {
|
||||
.context(CollectRecordsSnafu)?;
|
||||
|
||||
if records.is_empty() {
|
||||
return Ok(None);
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
// limit 1
|
||||
ensure!(
|
||||
records.len() == 1 && records[0].num_columns() == 2,
|
||||
!records.is_empty() && records.iter().all(|r| r.num_columns() == 3),
|
||||
PipelineNotFoundSnafu { name, version }
|
||||
);
|
||||
|
||||
let pipeline_content_column = records[0].column(0);
|
||||
let pipeline_content = pipeline_content_column
|
||||
.as_any()
|
||||
.downcast_ref::<StringVector>()
|
||||
.with_context(|| CastTypeSnafu {
|
||||
msg: format!(
|
||||
"can't downcast {:?} array into string vector",
|
||||
pipeline_content_column.data_type()
|
||||
),
|
||||
})?;
|
||||
let mut re = Vec::with_capacity(records.len());
|
||||
for r in records {
|
||||
let pipeline_content_column = r.column(0);
|
||||
let pipeline_content = pipeline_content_column
|
||||
.as_any()
|
||||
.downcast_ref::<StringVector>()
|
||||
.with_context(|| CastTypeSnafu {
|
||||
msg: format!(
|
||||
"can't downcast {:?} array into string vector",
|
||||
pipeline_content_column.data_type()
|
||||
),
|
||||
})?;
|
||||
|
||||
let pipeline_created_at_column = records[0].column(1);
|
||||
let pipeline_created_at = pipeline_created_at_column
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondVector>()
|
||||
.with_context(|| CastTypeSnafu {
|
||||
msg: format!(
|
||||
"can't downcast {:?} array into scalar vector",
|
||||
pipeline_created_at_column.data_type()
|
||||
),
|
||||
})?;
|
||||
let pipeline_schema_column = r.column(1);
|
||||
let pipeline_schema = pipeline_schema_column
|
||||
.as_any()
|
||||
.downcast_ref::<StringVector>()
|
||||
.with_context(|| CastTypeSnafu {
|
||||
msg: format!(
|
||||
"can't downcast {:?} array into string vector",
|
||||
pipeline_schema_column.data_type()
|
||||
),
|
||||
})?;
|
||||
|
||||
debug!(
|
||||
"find_pipeline_by_name: pipeline_content: {:?}, pipeline_created_at: {:?}",
|
||||
pipeline_content, pipeline_created_at
|
||||
);
|
||||
let pipeline_created_at_column = r.column(2);
|
||||
let pipeline_created_at = pipeline_created_at_column
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondVector>()
|
||||
.with_context(|| CastTypeSnafu {
|
||||
msg: format!(
|
||||
"can't downcast {:?} array into scalar vector",
|
||||
pipeline_created_at_column.data_type()
|
||||
),
|
||||
})?;
|
||||
|
||||
ensure!(
|
||||
pipeline_content.len() == 1,
|
||||
PipelineNotFoundSnafu { name, version }
|
||||
);
|
||||
debug!(
|
||||
"find_pipeline_by_name: pipeline_content: {:?}, pipeline_schema: {:?}, pipeline_created_at: {:?}",
|
||||
pipeline_content, pipeline_schema, pipeline_created_at
|
||||
);
|
||||
|
||||
// Safety: asserted above
|
||||
Ok(Some((
|
||||
pipeline_content.get_data(0).unwrap().to_string(),
|
||||
pipeline_created_at.get_data(0).unwrap(),
|
||||
)))
|
||||
ensure!(
|
||||
pipeline_content.len() == pipeline_schema.len()
|
||||
&& pipeline_schema.len() == pipeline_created_at.len(),
|
||||
RecordBatchLenNotMatchSnafu
|
||||
);
|
||||
|
||||
let len = pipeline_content.len();
|
||||
for i in 0..len {
|
||||
re.push((
|
||||
pipeline_content.get_data(i).unwrap().to_string(),
|
||||
pipeline_schema.get_data(i).unwrap().to_string(),
|
||||
pipeline_created_at.get_data(i).unwrap(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(re)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,6 @@ use datatypes::timestamp::TimestampNanosecond;
|
||||
use crate::error::{InvalidPipelineVersionSnafu, Result};
|
||||
use crate::table::{
|
||||
PIPELINE_TABLE_CREATED_AT_COLUMN_NAME, PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME,
|
||||
PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME,
|
||||
};
|
||||
use crate::PipelineVersion;
|
||||
|
||||
@@ -34,15 +33,8 @@ pub fn to_pipeline_version(version_str: Option<&str>) -> Result<PipelineVersion>
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn prepare_dataframe_conditions(
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Expr {
|
||||
let mut conditions = vec![
|
||||
col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name)),
|
||||
col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)),
|
||||
];
|
||||
pub(crate) fn prepare_dataframe_conditions(name: &str, version: PipelineVersion) -> Expr {
|
||||
let mut conditions = vec![col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name))];
|
||||
|
||||
if let Some(v) = version {
|
||||
conditions
|
||||
@@ -63,6 +55,13 @@ pub(crate) fn generate_pipeline_cache_key(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn generate_pipeline_cache_key_suffix(name: &str, version: PipelineVersion) -> String {
|
||||
match version {
|
||||
Some(version) => format!("/{}/{}", name, i64::from(version)),
|
||||
None => format!("/{}/latest", name),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -177,7 +177,7 @@ pub async fn query_pipeline(
|
||||
pipeline_name,
|
||||
query_params
|
||||
.version
|
||||
.unwrap_or(pipeline_version.0.to_iso8601_string()),
|
||||
.unwrap_or(pipeline_version.0.to_timezone_aware_string(None)),
|
||||
start.elapsed().as_millis() as u64,
|
||||
Some(pipeline),
|
||||
))
|
||||
|
||||
@@ -1403,7 +1403,7 @@ pub async fn test_pipeline_api(store_type: StorageType) {
|
||||
// handshake
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
let body = r#"
|
||||
let pipeline_body = r#"
|
||||
processors:
|
||||
- date:
|
||||
field: time
|
||||
@@ -1437,7 +1437,7 @@ transform:
|
||||
let res = client
|
||||
.post("/v1/pipelines/greptime_guagua")
|
||||
.header("Content-Type", "application/x-yaml")
|
||||
.body(body)
|
||||
.body(pipeline_body)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
@@ -1452,7 +1452,7 @@ transform:
|
||||
let res = client
|
||||
.post("/v1/pipelines/test")
|
||||
.header("Content-Type", "application/x-yaml")
|
||||
.body(body)
|
||||
.body(pipeline_body)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
@@ -1505,7 +1505,7 @@ transform:
|
||||
.as_str()
|
||||
.unwrap();
|
||||
let docs = YamlLoader::load_from_str(pipeline_yaml).unwrap();
|
||||
let body_yaml = YamlLoader::load_from_str(body).unwrap();
|
||||
let body_yaml = YamlLoader::load_from_str(pipeline_body).unwrap();
|
||||
assert_eq!(docs, body_yaml);
|
||||
|
||||
// Do not specify version, get the latest version
|
||||
@@ -1560,7 +1560,43 @@ transform:
|
||||
)
|
||||
.await;
|
||||
|
||||
// 4. remove pipeline
|
||||
// 4. cross-ref pipeline
|
||||
// create database test_db
|
||||
let res = client
|
||||
.post("/v1/sql?sql=create database test_db")
|
||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// check test_db created
|
||||
validate_data(
|
||||
"pipeline_db_schema",
|
||||
&client,
|
||||
"show databases",
|
||||
"[[\"greptime_private\"],[\"information_schema\"],[\"public\"],[\"test_db\"]]",
|
||||
)
|
||||
.await;
|
||||
|
||||
// cross ref using public's pipeline
|
||||
let res = client
|
||||
.post("/v1/ingest?db=test_db&table=logs1&pipeline_name=test")
|
||||
.header("Content-Type", "application/json")
|
||||
.body(data_body)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// check write success
|
||||
validate_data(
|
||||
"pipeline_db_schema",
|
||||
&client,
|
||||
"select * from test_db.logs1",
|
||||
"[[2436,2528,\"INTERACT.MANAGER\",\"I\",\"ClusterAdapter:enter sendTextDataToCluster\\\\n\",1716668197217000000]]",
|
||||
)
|
||||
.await;
|
||||
|
||||
// 5. remove pipeline
|
||||
let encoded_ver_str: String =
|
||||
url::form_urlencoded::byte_serialize(version_str.as_bytes()).collect();
|
||||
let res = client
|
||||
@@ -1580,7 +1616,7 @@ transform:
|
||||
format!(r#"[{{"name":"test","version":"{}"}}]"#, version_str).as_str()
|
||||
);
|
||||
|
||||
// 5. write data failed
|
||||
// 6. write data failed
|
||||
let res = client
|
||||
.post("/v1/ingest?db=public&table=logs1&pipeline_name=test")
|
||||
.header("Content-Type", "application/json")
|
||||
|
||||
Reference in New Issue
Block a user