fix: pipeline loading issue (#7491) (#7540)

* fix: pipeline loading issue (#7491)

* fix: pipeline loading

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: change string to str

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor fix to save returned version

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: introduce PipelineContent

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: use found schema

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update CR

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: CR issue

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: etcd sh

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: typo

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
shuiyisong
2026-01-08 16:10:06 +08:00
committed by GitHub
parent 28124abbb7
commit 2fcd0cff2e
8 changed files with 123 additions and 82 deletions

View File

@@ -51,7 +51,7 @@ runs:
run: |
helm upgrade \
--install my-greptimedb \
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
--set 'meta.backendStorage.etcd.endpoints[0]=${{ inputs.etcd-endpoints }}' \
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
--set image.registry=${{ inputs.image-registry }} \
--set image.repository=${{ inputs.image-repository }} \

View File

@@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() {
helm install "$cluster_name" greptime/greptimedb-cluster \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set 'meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379"' \
-n "$install_namespace"
# Wait for greptimedb cluster to be ready.
@@ -103,7 +103,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set 'meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379"' \
--set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set storage.s3.region="$AWS_REGION" \
--set storage.s3.root="$DATA_ROOT" \

View File

@@ -646,10 +646,14 @@ pub enum Error {
},
#[snafu(display(
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. schemas: {}",
schemas
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. name: {}, current_schema: {}, schemas: {}",
name,
current_schema,
schemas,
))]
MultiPipelineWithDiffSchema {
name: String,
current_schema: String,
schemas: String,
#[snafu(implicit)]
location: Location,

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::debug;
use datatypes::timestamp::TimestampNanosecond;
use moka::sync::Cache;
@@ -33,10 +34,18 @@ const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
/// to encapsulate inner cache. Only public methods are exposed.
pub(crate) struct PipelineCache {
pipelines: Cache<String, Arc<Pipeline>>,
original_pipelines: Cache<String, (String, TimestampNanosecond)>,
original_pipelines: Cache<String, PipelineContent>,
/// If the pipeline table is invalid, we can use this cache to prevent failures when writing logs through the pipeline
/// The failover cache never expires, but it will be updated when the pipelines cache is updated.
failover_cache: Cache<String, (String, TimestampNanosecond)>,
failover_cache: Cache<String, PipelineContent>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PipelineContent {
pub name: String,
pub content: String,
pub version: TimestampNanosecond,
pub schema: String,
}
impl PipelineCache {
@@ -45,12 +54,17 @@ impl PipelineCache {
pipelines: Cache::builder()
.max_capacity(PIPELINES_CACHE_SIZE)
.time_to_live(PIPELINES_CACHE_TTL)
.name("pipelines")
.build(),
original_pipelines: Cache::builder()
.max_capacity(PIPELINES_CACHE_SIZE)
.time_to_live(PIPELINES_CACHE_TTL)
.name("original_pipelines")
.build(),
failover_cache: Cache::builder()
.max_capacity(PIPELINES_CACHE_SIZE)
.name("failover_cache")
.build(),
failover_cache: Cache::builder().max_capacity(PIPELINES_CACHE_SIZE).build(),
}
}
@@ -72,19 +86,15 @@ impl PipelineCache {
);
}
pub(crate) fn insert_pipeline_str_cache(
&self,
schema: &str,
name: &str,
version: PipelineVersion,
pipeline: (String, TimestampNanosecond),
with_latest: bool,
) {
pub(crate) fn insert_pipeline_str_cache(&self, pipeline: &PipelineContent, with_latest: bool) {
let schema = pipeline.schema.as_str();
let name = pipeline.name.as_str();
let version = pipeline.version;
insert_cache_generic(
&self.original_pipelines,
schema,
name,
version,
Some(version),
pipeline.clone(),
with_latest,
);
@@ -92,8 +102,8 @@ impl PipelineCache {
&self.failover_cache,
schema,
name,
version,
pipeline,
Some(version),
pipeline.clone(),
with_latest,
);
}
@@ -112,7 +122,7 @@ impl PipelineCache {
schema: &str,
name: &str,
version: PipelineVersion,
) -> Result<Option<(String, TimestampNanosecond)>> {
) -> Result<Option<PipelineContent>> {
get_cache_generic(&self.failover_cache, schema, name, version)
}
@@ -121,7 +131,7 @@ impl PipelineCache {
schema: &str,
name: &str,
version: PipelineVersion,
) -> Result<Option<(String, TimestampNanosecond)>> {
) -> Result<Option<PipelineContent>> {
get_cache_generic(&self.original_pipelines, schema, name, version)
}
@@ -174,13 +184,13 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
version: PipelineVersion,
) -> Result<Option<T>> {
// lets try empty schema first
let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
if let Some(value) = cache.get(&k) {
let emp_key = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
if let Some(value) = cache.get(&emp_key) {
return Ok(Some(value));
}
// use input schema
let k = generate_pipeline_cache_key(schema, name, version);
if let Some(value) = cache.get(&k) {
let schema_k = generate_pipeline_cache_key(schema, name, version);
if let Some(value) = cache.get(&schema_k) {
return Ok(Some(value));
}
@@ -193,14 +203,28 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
match ks.len() {
0 => Ok(None),
1 => Ok(Some(ks.remove(0).1)),
_ => MultiPipelineWithDiffSchemaSnafu {
schemas: ks
.iter()
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
.collect::<Vec<_>>()
.join(","),
1 => {
let (_, value) = ks.remove(0);
Ok(Some(value))
}
_ => {
debug!(
"caches keys: {:?}, emp key: {:?}, schema key: {:?}, suffix key: {:?}",
cache.iter().map(|e| e.0).collect::<Vec<_>>(),
emp_key,
schema_k,
suffix_key
);
MultiPipelineWithDiffSchemaSnafu {
name: name.to_string(),
current_schema: schema.to_string(),
schemas: ks
.iter()
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
.collect::<Vec<_>>()
.join(","),
}
.fail()?
}
.fail()?,
}
}

View File

@@ -220,6 +220,7 @@ impl PipelineOperator {
.observe(timer.elapsed().as_secs_f64())
})
.await
.map(|p| (p.content, p.version))
}
/// Insert a pipeline into the pipeline table.

View File

@@ -44,7 +44,7 @@ use crate::error::{
MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
};
use crate::etl::{parse, Content, Pipeline};
use crate::manager::pipeline_cache::PipelineCache;
use crate::manager::pipeline_cache::{PipelineCache, PipelineContent};
use crate::manager::{PipelineInfo, PipelineVersion};
use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
use crate::util::prepare_dataframe_conditions;
@@ -258,17 +258,22 @@ impl PipelineTable {
&self,
schema: &str,
name: &str,
version: PipelineVersion,
input_version: PipelineVersion,
) -> Result<Arc<Pipeline>> {
if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, input_version)? {
return Ok(pipeline);
}
let pipeline = self.get_pipeline_str(schema, name, version).await?;
let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
let pipeline_content = self.get_pipeline_str(schema, name, input_version).await?;
let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline_content.content)?);
self.cache
.insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
self.cache.insert_pipeline_cache(
&pipeline_content.schema,
name,
Some(pipeline_content.version),
compiled_pipeline.clone(),
input_version.is_none(),
);
Ok(compiled_pipeline)
}
@@ -278,14 +283,17 @@ impl PipelineTable {
&self,
schema: &str,
name: &str,
version: PipelineVersion,
) -> Result<(String, TimestampNanosecond)> {
if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
input_version: PipelineVersion,
) -> Result<PipelineContent> {
if let Some(pipeline) = self
.cache
.get_pipeline_str_cache(schema, name, input_version)?
{
return Ok(pipeline);
}
let mut pipeline_vec;
match self.find_pipeline(name, version).await {
match self.find_pipeline(name, input_version).await {
Ok(p) => {
METRIC_PIPELINE_TABLE_FIND_COUNT
.with_label_values(&["true"])
@@ -302,8 +310,11 @@ impl PipelineTable {
.inc();
return self
.cache
.get_failover_cache(schema, name, version)?
.ok_or(PipelineNotFoundSnafu { name, version }.build());
.get_failover_cache(schema, name, input_version)?
.context(PipelineNotFoundSnafu {
name,
version: input_version,
});
}
_ => {
// if other error, we should return it
@@ -314,42 +325,40 @@ impl PipelineTable {
};
ensure!(
!pipeline_vec.is_empty(),
PipelineNotFoundSnafu { name, version }
PipelineNotFoundSnafu {
name,
version: input_version
}
);
// if the result is exact one, use it
if pipeline_vec.len() == 1 {
let (pipeline_content, found_schema, version) = pipeline_vec.remove(0);
let p = (pipeline_content, version);
self.cache.insert_pipeline_str_cache(
&found_schema,
name,
Some(version),
p.clone(),
false,
);
return Ok(p);
let pipeline_content = pipeline_vec.remove(0);
self.cache
.insert_pipeline_str_cache(&pipeline_content, input_version.is_none());
return Ok(pipeline_content);
}
// check if there's empty schema pipeline
// if there isn't, check current schema
let pipeline = pipeline_vec
.iter()
.find(|v| v.1 == EMPTY_SCHEMA_NAME)
.or_else(|| pipeline_vec.iter().find(|v| v.1 == schema));
.position(|v| v.schema == EMPTY_SCHEMA_NAME)
.or_else(|| pipeline_vec.iter().position(|v| v.schema == schema))
.map(|idx| pipeline_vec.remove(idx));
// multiple pipeline with no empty or current schema
// throw an error
let (pipeline_content, found_schema, version) =
pipeline.context(MultiPipelineWithDiffSchemaSnafu {
schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
})?;
let pipeline_content = pipeline.with_context(|| MultiPipelineWithDiffSchemaSnafu {
name: name.to_string(),
current_schema: schema.to_string(),
schemas: pipeline_vec.iter().map(|v| v.schema.clone()).join(","),
})?;
let v = *version;
let p = (pipeline_content.clone(), v);
self.cache
.insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
Ok(p)
.insert_pipeline_str_cache(&pipeline_content, input_version.is_none());
Ok(pipeline_content)
}
/// Insert a pipeline into the pipeline table and compile it.
@@ -376,13 +385,15 @@ impl PipelineTable {
true,
);
self.cache.insert_pipeline_str_cache(
EMPTY_SCHEMA_NAME,
name,
Some(TimestampNanosecond(version)),
(pipeline.to_owned(), TimestampNanosecond(version)),
true,
);
let pipeline_content = PipelineContent {
name: name.to_string(),
content: pipeline.to_string(),
version: TimestampNanosecond(version),
schema: EMPTY_SCHEMA_NAME.to_string(),
};
self.cache
.insert_pipeline_str_cache(&pipeline_content, true);
}
Ok((version, compiled_pipeline))
@@ -472,7 +483,7 @@ impl PipelineTable {
&self,
name: &str,
version: PipelineVersion,
) -> Result<Vec<(String, String, TimestampNanosecond)>> {
) -> Result<Vec<PipelineContent>> {
// 1. prepare dataframe
let dataframe = self
.query_engine
@@ -575,11 +586,12 @@ impl PipelineTable {
let len = pipeline_content.len();
for i in 0..len {
re.push((
pipeline_content.get_data(i).unwrap().to_string(),
pipeline_schema.get_data(i).unwrap().to_string(),
pipeline_created_at.get_data(i).unwrap(),
));
re.push(PipelineContent {
name: name.to_string(),
content: pipeline_content.get_data(i).unwrap().to_string(),
version: pipeline_created_at.get_data(i).unwrap(),
schema: pipeline_schema.get_data(i).unwrap().to_string(),
});
}
}

View File

@@ -263,7 +263,7 @@ impl Categorizer {
}
}
// all group by expressions are partition columns can push down, unless
// another push down(including `Limit` or `Sort`) is already in progress(which will then prvent next cond commutative node from being push down).
// another push down(including `Limit` or `Sort`) is already in progress(which will then prevent next cond commutative node from being push down).
// TODO(discord9): This is a temporary solution(that works), a better description of
// commutativity is needed under this situation.
Commutativity::ConditionalCommutative(None)

View File

@@ -203,7 +203,7 @@ impl QueryEngineState {
rules.retain(|rule| rule.name() != name);
}
/// Optimize the logical plan by the extension anayzer rules.
/// Optimize the logical plan by the extension analyzer rules.
pub fn optimize_by_extension_rules(
&self,
plan: DfLogicalPlan,