mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
Compare commits
1 Commits
chore/v0.1
...
release/v0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
28124abbb7 |
@@ -115,7 +115,6 @@ impl PipelineHandler for Instance {
|
||||
.get_pipeline_str(name, version, query_ctx)
|
||||
.await
|
||||
.context(PipelineSnafu)
|
||||
.map(|(p, _)| p)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -646,14 +646,10 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. name: {}, current_schema: {}, schemas: {}",
|
||||
name,
|
||||
current_schema,
|
||||
schemas,
|
||||
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. schemas: {}",
|
||||
schemas
|
||||
))]
|
||||
MultiPipelineWithDiffSchema {
|
||||
name: String,
|
||||
current_schema: String,
|
||||
schemas: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use datatypes::timestamp::TimestampNanosecond;
|
||||
use moka::sync::Cache;
|
||||
|
||||
@@ -46,17 +45,12 @@ impl PipelineCache {
|
||||
pipelines: Cache::builder()
|
||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||
.time_to_live(PIPELINES_CACHE_TTL)
|
||||
.name("pipelines")
|
||||
.build(),
|
||||
original_pipelines: Cache::builder()
|
||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||
.time_to_live(PIPELINES_CACHE_TTL)
|
||||
.name("original_pipelines")
|
||||
.build(),
|
||||
failover_cache: Cache::builder()
|
||||
.max_capacity(PIPELINES_CACHE_SIZE)
|
||||
.name("failover_cache")
|
||||
.build(),
|
||||
failover_cache: Cache::builder().max_capacity(PIPELINES_CACHE_SIZE).build(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,7 +103,7 @@ impl PipelineCache {
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<(Arc<Pipeline>, String)>> {
|
||||
) -> Result<Option<Arc<Pipeline>>> {
|
||||
get_cache_generic(&self.pipelines, schema, name, version)
|
||||
}
|
||||
|
||||
@@ -118,7 +112,7 @@ impl PipelineCache {
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<((String, TimestampNanosecond), String)>> {
|
||||
) -> Result<Option<(String, TimestampNanosecond)>> {
|
||||
get_cache_generic(&self.failover_cache, schema, name, version)
|
||||
}
|
||||
|
||||
@@ -127,7 +121,7 @@ impl PipelineCache {
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<((String, TimestampNanosecond), String)>> {
|
||||
) -> Result<Option<(String, TimestampNanosecond)>> {
|
||||
get_cache_generic(&self.original_pipelines, schema, name, version)
|
||||
}
|
||||
|
||||
@@ -178,16 +172,16 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<(T, String)>> {
|
||||
) -> Result<Option<T>> {
|
||||
// lets try empty schema first
|
||||
let emp_key = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
|
||||
if let Some(value) = cache.get(&emp_key) {
|
||||
return Ok(Some((value, EMPTY_SCHEMA_NAME.to_string())));
|
||||
let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
|
||||
if let Some(value) = cache.get(&k) {
|
||||
return Ok(Some(value));
|
||||
}
|
||||
// use input schema
|
||||
let schema_k = generate_pipeline_cache_key(schema, name, version);
|
||||
if let Some(value) = cache.get(&schema_k) {
|
||||
return Ok(Some((value, schema.to_string())));
|
||||
let k = generate_pipeline_cache_key(schema, name, version);
|
||||
if let Some(value) = cache.get(&k) {
|
||||
return Ok(Some(value));
|
||||
}
|
||||
|
||||
// try all schemas
|
||||
@@ -199,32 +193,14 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
|
||||
|
||||
match ks.len() {
|
||||
0 => Ok(None),
|
||||
1 => {
|
||||
let (key, value) = ks.remove(0);
|
||||
if let Some((key_schema, _)) = key.split_once("/") {
|
||||
Ok(Some((value, key_schema.to_string())))
|
||||
} else {
|
||||
Ok(Some((value, EMPTY_SCHEMA_NAME.to_string())))
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
debug!(
|
||||
"caches keys: {:?}, emp key: {:?}, schema key: {:?}, suffix key: {:?}",
|
||||
cache.iter().map(|e| e.0).collect::<Vec<_>>(),
|
||||
emp_key,
|
||||
schema_k,
|
||||
suffix_key
|
||||
);
|
||||
MultiPipelineWithDiffSchemaSnafu {
|
||||
name: name.to_string(),
|
||||
current_schema: schema.to_string(),
|
||||
schemas: ks
|
||||
.iter()
|
||||
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
|
||||
.collect::<Vec<_>>()
|
||||
.join(","),
|
||||
}
|
||||
.fail()?
|
||||
1 => Ok(Some(ks.remove(0).1)),
|
||||
_ => MultiPipelineWithDiffSchemaSnafu {
|
||||
schemas: ks
|
||||
.iter()
|
||||
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
|
||||
.collect::<Vec<_>>()
|
||||
.join(","),
|
||||
}
|
||||
.fail()?,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,7 +205,7 @@ impl PipelineOperator {
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<((String, TimestampNanosecond), String)> {
|
||||
) -> Result<(String, TimestampNanosecond)> {
|
||||
let schema = query_ctx.current_schema();
|
||||
self.create_pipeline_table_if_not_exists(query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
@@ -261,19 +261,14 @@ impl PipelineTable {
|
||||
version: PipelineVersion,
|
||||
) -> Result<Arc<Pipeline>> {
|
||||
if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
|
||||
return Ok(pipeline.0);
|
||||
return Ok(pipeline);
|
||||
}
|
||||
|
||||
let (pipeline, found_schema) = self.get_pipeline_str(schema, name, version).await?;
|
||||
let pipeline = self.get_pipeline_str(schema, name, version).await?;
|
||||
let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
|
||||
|
||||
self.cache.insert_pipeline_cache(
|
||||
&found_schema,
|
||||
name,
|
||||
version,
|
||||
compiled_pipeline.clone(),
|
||||
version.is_none(),
|
||||
);
|
||||
self.cache
|
||||
.insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
|
||||
Ok(compiled_pipeline)
|
||||
}
|
||||
|
||||
@@ -283,17 +278,14 @@ impl PipelineTable {
|
||||
&self,
|
||||
schema: &str,
|
||||
name: &str,
|
||||
input_version: PipelineVersion,
|
||||
) -> Result<((String, TimestampNanosecond), String)> {
|
||||
if let Some(pipeline) = self
|
||||
.cache
|
||||
.get_pipeline_str_cache(schema, name, input_version)?
|
||||
{
|
||||
version: PipelineVersion,
|
||||
) -> Result<(String, TimestampNanosecond)> {
|
||||
if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
|
||||
return Ok(pipeline);
|
||||
}
|
||||
|
||||
let mut pipeline_vec;
|
||||
match self.find_pipeline(name, input_version).await {
|
||||
match self.find_pipeline(name, version).await {
|
||||
Ok(p) => {
|
||||
METRIC_PIPELINE_TABLE_FIND_COUNT
|
||||
.with_label_values(&["true"])
|
||||
@@ -310,14 +302,8 @@ impl PipelineTable {
|
||||
.inc();
|
||||
return self
|
||||
.cache
|
||||
.get_failover_cache(schema, name, input_version)?
|
||||
.ok_or(
|
||||
PipelineNotFoundSnafu {
|
||||
name,
|
||||
version: input_version,
|
||||
}
|
||||
.build(),
|
||||
);
|
||||
.get_failover_cache(schema, name, version)?
|
||||
.ok_or(PipelineNotFoundSnafu { name, version }.build());
|
||||
}
|
||||
_ => {
|
||||
// if other error, we should return it
|
||||
@@ -328,10 +314,7 @@ impl PipelineTable {
|
||||
};
|
||||
ensure!(
|
||||
!pipeline_vec.is_empty(),
|
||||
PipelineNotFoundSnafu {
|
||||
name,
|
||||
version: input_version
|
||||
}
|
||||
PipelineNotFoundSnafu { name, version }
|
||||
);
|
||||
|
||||
// if the result is exact one, use it
|
||||
@@ -343,9 +326,9 @@ impl PipelineTable {
|
||||
name,
|
||||
Some(version),
|
||||
p.clone(),
|
||||
input_version.is_none(),
|
||||
false,
|
||||
);
|
||||
return Ok((p, found_schema));
|
||||
return Ok(p);
|
||||
}
|
||||
|
||||
// check if there's empty schema pipeline
|
||||
@@ -359,22 +342,14 @@ impl PipelineTable {
|
||||
// throw an error
|
||||
let (pipeline_content, found_schema, version) =
|
||||
pipeline.context(MultiPipelineWithDiffSchemaSnafu {
|
||||
name: name.to_string(),
|
||||
current_schema: schema.to_string(),
|
||||
schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
|
||||
})?;
|
||||
|
||||
let v = *version;
|
||||
let p = (pipeline_content.clone(), v);
|
||||
|
||||
self.cache.insert_pipeline_str_cache(
|
||||
found_schema,
|
||||
name,
|
||||
Some(v),
|
||||
p.clone(),
|
||||
input_version.is_none(),
|
||||
);
|
||||
Ok((p, found_schema.clone()))
|
||||
self.cache
|
||||
.insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
|
||||
Ok(p)
|
||||
}
|
||||
|
||||
/// Insert a pipeline into the pipeline table and compile it.
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
v0.10.1
|
||||
v0.11.9
|
||||
|
||||
|
||||
Reference in New Issue
Block a user