Compare commits

..

1 Commits

Author SHA1 Message Date
Yingwen
28124abbb7 feat: update dashboard to v0.11.9 (#7364) (#7371)
Signed-off-by: evenyag <realevenyag@gmail.com>
Co-authored-by: ZonaHe <zonahe@qq.com>
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2025-12-09 17:34:42 +08:00
6 changed files with 40 additions and 93 deletions

View File

@@ -115,7 +115,6 @@ impl PipelineHandler for Instance {
.get_pipeline_str(name, version, query_ctx)
.await
.context(PipelineSnafu)
.map(|(p, _)| p)
}
}

View File

@@ -646,14 +646,10 @@ pub enum Error {
},
#[snafu(display(
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. name: {}, current_schema: {}, schemas: {}",
name,
current_schema,
schemas,
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. schemas: {}",
schemas
))]
MultiPipelineWithDiffSchema {
name: String,
current_schema: String,
schemas: String,
#[snafu(implicit)]
location: Location,

View File

@@ -15,7 +15,6 @@
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::debug;
use datatypes::timestamp::TimestampNanosecond;
use moka::sync::Cache;
@@ -46,17 +45,12 @@ impl PipelineCache {
pipelines: Cache::builder()
.max_capacity(PIPELINES_CACHE_SIZE)
.time_to_live(PIPELINES_CACHE_TTL)
.name("pipelines")
.build(),
original_pipelines: Cache::builder()
.max_capacity(PIPELINES_CACHE_SIZE)
.time_to_live(PIPELINES_CACHE_TTL)
.name("original_pipelines")
.build(),
failover_cache: Cache::builder()
.max_capacity(PIPELINES_CACHE_SIZE)
.name("failover_cache")
.build(),
failover_cache: Cache::builder().max_capacity(PIPELINES_CACHE_SIZE).build(),
}
}
@@ -109,7 +103,7 @@ impl PipelineCache {
schema: &str,
name: &str,
version: PipelineVersion,
) -> Result<Option<(Arc<Pipeline>, String)>> {
) -> Result<Option<Arc<Pipeline>>> {
get_cache_generic(&self.pipelines, schema, name, version)
}
@@ -118,7 +112,7 @@ impl PipelineCache {
schema: &str,
name: &str,
version: PipelineVersion,
) -> Result<Option<((String, TimestampNanosecond), String)>> {
) -> Result<Option<(String, TimestampNanosecond)>> {
get_cache_generic(&self.failover_cache, schema, name, version)
}
@@ -127,7 +121,7 @@ impl PipelineCache {
schema: &str,
name: &str,
version: PipelineVersion,
) -> Result<Option<((String, TimestampNanosecond), String)>> {
) -> Result<Option<(String, TimestampNanosecond)>> {
get_cache_generic(&self.original_pipelines, schema, name, version)
}
@@ -178,16 +172,16 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
schema: &str,
name: &str,
version: PipelineVersion,
) -> Result<Option<(T, String)>> {
) -> Result<Option<T>> {
// lets try empty schema first
let emp_key = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
if let Some(value) = cache.get(&emp_key) {
return Ok(Some((value, EMPTY_SCHEMA_NAME.to_string())));
let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
if let Some(value) = cache.get(&k) {
return Ok(Some(value));
}
// use input schema
let schema_k = generate_pipeline_cache_key(schema, name, version);
if let Some(value) = cache.get(&schema_k) {
return Ok(Some((value, schema.to_string())));
let k = generate_pipeline_cache_key(schema, name, version);
if let Some(value) = cache.get(&k) {
return Ok(Some(value));
}
// try all schemas
@@ -199,32 +193,14 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
match ks.len() {
0 => Ok(None),
1 => {
let (key, value) = ks.remove(0);
if let Some((key_schema, _)) = key.split_once("/") {
Ok(Some((value, key_schema.to_string())))
} else {
Ok(Some((value, EMPTY_SCHEMA_NAME.to_string())))
}
}
_ => {
debug!(
"caches keys: {:?}, emp key: {:?}, schema key: {:?}, suffix key: {:?}",
cache.iter().map(|e| e.0).collect::<Vec<_>>(),
emp_key,
schema_k,
suffix_key
);
MultiPipelineWithDiffSchemaSnafu {
name: name.to_string(),
current_schema: schema.to_string(),
schemas: ks
.iter()
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
.collect::<Vec<_>>()
.join(","),
}
.fail()?
1 => Ok(Some(ks.remove(0).1)),
_ => MultiPipelineWithDiffSchemaSnafu {
schemas: ks
.iter()
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
.collect::<Vec<_>>()
.join(","),
}
.fail()?,
}
}

View File

@@ -205,7 +205,7 @@ impl PipelineOperator {
name: &str,
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<((String, TimestampNanosecond), String)> {
) -> Result<(String, TimestampNanosecond)> {
let schema = query_ctx.current_schema();
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;

View File

@@ -261,19 +261,14 @@ impl PipelineTable {
version: PipelineVersion,
) -> Result<Arc<Pipeline>> {
if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
return Ok(pipeline.0);
return Ok(pipeline);
}
let (pipeline, found_schema) = self.get_pipeline_str(schema, name, version).await?;
let pipeline = self.get_pipeline_str(schema, name, version).await?;
let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
self.cache.insert_pipeline_cache(
&found_schema,
name,
version,
compiled_pipeline.clone(),
version.is_none(),
);
self.cache
.insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
Ok(compiled_pipeline)
}
@@ -283,17 +278,14 @@ impl PipelineTable {
&self,
schema: &str,
name: &str,
input_version: PipelineVersion,
) -> Result<((String, TimestampNanosecond), String)> {
if let Some(pipeline) = self
.cache
.get_pipeline_str_cache(schema, name, input_version)?
{
version: PipelineVersion,
) -> Result<(String, TimestampNanosecond)> {
if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
return Ok(pipeline);
}
let mut pipeline_vec;
match self.find_pipeline(name, input_version).await {
match self.find_pipeline(name, version).await {
Ok(p) => {
METRIC_PIPELINE_TABLE_FIND_COUNT
.with_label_values(&["true"])
@@ -310,14 +302,8 @@ impl PipelineTable {
.inc();
return self
.cache
.get_failover_cache(schema, name, input_version)?
.ok_or(
PipelineNotFoundSnafu {
name,
version: input_version,
}
.build(),
);
.get_failover_cache(schema, name, version)?
.ok_or(PipelineNotFoundSnafu { name, version }.build());
}
_ => {
// if other error, we should return it
@@ -328,10 +314,7 @@ impl PipelineTable {
};
ensure!(
!pipeline_vec.is_empty(),
PipelineNotFoundSnafu {
name,
version: input_version
}
PipelineNotFoundSnafu { name, version }
);
// if the result is exact one, use it
@@ -343,9 +326,9 @@ impl PipelineTable {
name,
Some(version),
p.clone(),
input_version.is_none(),
false,
);
return Ok((p, found_schema));
return Ok(p);
}
// check if there's empty schema pipeline
@@ -359,22 +342,14 @@ impl PipelineTable {
// throw an error
let (pipeline_content, found_schema, version) =
pipeline.context(MultiPipelineWithDiffSchemaSnafu {
name: name.to_string(),
current_schema: schema.to_string(),
schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
})?;
let v = *version;
let p = (pipeline_content.clone(), v);
self.cache.insert_pipeline_str_cache(
found_schema,
name,
Some(v),
p.clone(),
input_version.is_none(),
);
Ok((p, found_schema.clone()))
self.cache
.insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
Ok(p)
}
/// Insert a pipeline into the pipeline table and compile it.

View File

@@ -1 +1,2 @@
v0.10.1
v0.11.9