mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
@@ -646,10 +646,14 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. schemas: {}",
|
||||
schemas
|
||||
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. name: {}, current_schema: {}, schemas: {}",
|
||||
name,
|
||||
current_schema,
|
||||
schemas,
|
||||
))]
|
||||
MultiPipelineWithDiffSchema {
|
||||
name: String,
|
||||
current_schema: String,
|
||||
schemas: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use datatypes::timestamp::TimestampNanosecond;
|
||||
use moka::sync::Cache;
|
||||
|
||||
@@ -45,12 +46,17 @@ impl PipelineCache {
|
||||
pipelines: Cache::builder()
|
||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||
.time_to_live(PIPELINES_CACHE_TTL)
|
||||
.name("pipelines")
|
||||
.build(),
|
||||
original_pipelines: Cache::builder()
|
||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||
.time_to_live(PIPELINES_CACHE_TTL)
|
||||
.name("original_pipelines")
|
||||
.build(),
|
||||
failover_cache: Cache::builder()
|
||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||
.name("failover_cache")
|
||||
.build(),
|
||||
failover_cache: Cache::builder().max_capacity(PIPELINES_CACHE_SIZE).build(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,13 +180,13 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<T>> {
|
||||
// lets try empty schema first
|
||||
let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
|
||||
if let Some(value) = cache.get(&k) {
|
||||
let emp_key = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
|
||||
if let Some(value) = cache.get(&emp_key) {
|
||||
return Ok(Some(value));
|
||||
}
|
||||
// use input schema
|
||||
let k = generate_pipeline_cache_key(schema, name, version);
|
||||
if let Some(value) = cache.get(&k) {
|
||||
let schema_k = generate_pipeline_cache_key(schema, name, version);
|
||||
if let Some(value) = cache.get(&schema_k) {
|
||||
return Ok(Some(value));
|
||||
}
|
||||
|
||||
@@ -194,13 +200,24 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
|
||||
match ks.len() {
|
||||
0 => Ok(None),
|
||||
1 => Ok(Some(ks.remove(0).1)),
|
||||
_ => MultiPipelineWithDiffSchemaSnafu {
|
||||
schemas: ks
|
||||
.iter()
|
||||
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
|
||||
.collect::<Vec<_>>()
|
||||
.join(","),
|
||||
_ => {
|
||||
debug!(
|
||||
"caches keys: {:?}, emp key: {:?}, schema key: {:?}, suffix key: {:?}",
|
||||
cache.iter().map(|e| e.0).collect::<Vec<_>>(),
|
||||
emp_key,
|
||||
schema_k,
|
||||
suffix_key
|
||||
);
|
||||
MultiPipelineWithDiffSchemaSnafu {
|
||||
name: name.to_string(),
|
||||
current_schema: schema.to_string(),
|
||||
schemas: ks
|
||||
.iter()
|
||||
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
|
||||
.collect::<Vec<_>>()
|
||||
.join(","),
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
.fail()?,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -278,14 +278,17 @@ impl PipelineTable {
|
||||
&self,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
input_version: PipelineVersion,
|
||||
) -> Result<(String, TimestampNanosecond)> {
|
||||
if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
|
||||
if let Some(pipeline) = self
|
||||
.cache
|
||||
.get_pipeline_str_cache(schema, name, input_version)?
|
||||
{
|
||||
return Ok(pipeline);
|
||||
}
|
||||
|
||||
let mut pipeline_vec;
|
||||
match self.find_pipeline(name, version).await {
|
||||
match self.find_pipeline(name, input_version).await {
|
||||
Ok(p) => {
|
||||
METRIC_PIPELINE_TABLE_FIND_COUNT
|
||||
.with_label_values(&["true"])
|
||||
@@ -302,8 +305,14 @@ impl PipelineTable {
|
||||
.inc();
|
||||
return self
|
||||
.cache
|
||||
.get_failover_cache(schema, name, version)?
|
||||
.ok_or(PipelineNotFoundSnafu { name, version }.build());
|
||||
.get_failover_cache(schema, name, input_version)?
|
||||
.ok_or(
|
||||
PipelineNotFoundSnafu {
|
||||
name,
|
||||
version: input_version,
|
||||
}
|
||||
.build(),
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
// if other error, we should return it
|
||||
@@ -314,7 +323,10 @@ impl PipelineTable {
|
||||
};
|
||||
ensure!(
|
||||
!pipeline_vec.is_empty(),
|
||||
PipelineNotFoundSnafu { name, version }
|
||||
PipelineNotFoundSnafu {
|
||||
name,
|
||||
version: input_version
|
||||
}
|
||||
);
|
||||
|
||||
// if the result is exact one, use it
|
||||
@@ -342,13 +354,18 @@ impl PipelineTable {
|
||||
// throw an error
|
||||
let (pipeline_content, found_schema, version) =
|
||||
pipeline.context(MultiPipelineWithDiffSchemaSnafu {
|
||||
name: name.to_string(),
|
||||
current_schema: schema.to_string(),
|
||||
schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
|
||||
})?;
|
||||
|
||||
let v = *version;
|
||||
let p = (pipeline_content.clone(), v);
|
||||
|
||||
let with_latest = input_version.is_none();
|
||||
|
||||
self.cache
|
||||
.insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
|
||||
.insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), with_latest);
|
||||
Ok(p)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user