mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 15:22:56 +00:00
@@ -115,6 +115,7 @@ impl PipelineHandler for Instance {
|
||||
.get_pipeline_str(name, version, query_ctx)
|
||||
.await
|
||||
.context(PipelineSnafu)
|
||||
.map(|(p, _)| p)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -109,7 +109,7 @@ impl PipelineCache {
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<Arc<Pipeline>>> {
|
||||
) -> Result<Option<(Arc<Pipeline>, String)>> {
|
||||
get_cache_generic(&self.pipelines, schema, name, version)
|
||||
}
|
||||
|
||||
@@ -118,7 +118,7 @@ impl PipelineCache {
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<(String, TimestampNanosecond)>> {
|
||||
) -> Result<Option<((String, TimestampNanosecond), String)>> {
|
||||
get_cache_generic(&self.failover_cache, schema, name, version)
|
||||
}
|
||||
|
||||
@@ -127,7 +127,7 @@ impl PipelineCache {
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<(String, TimestampNanosecond)>> {
|
||||
) -> Result<Option<((String, TimestampNanosecond), String)>> {
|
||||
get_cache_generic(&self.original_pipelines, schema, name, version)
|
||||
}
|
||||
|
||||
@@ -178,16 +178,16 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
|
||||
schema: &str,
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
) -> Result<Option<T>> {
|
||||
) -> Result<Option<(T, String)>> {
|
||||
// lets try empty schema first
|
||||
let emp_key = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
|
||||
if let Some(value) = cache.get(&emp_key) {
|
||||
return Ok(Some(value));
|
||||
return Ok(Some((value, EMPTY_SCHEMA_NAME.to_string())));
|
||||
}
|
||||
// use input schema
|
||||
let schema_k = generate_pipeline_cache_key(schema, name, version);
|
||||
if let Some(value) = cache.get(&schema_k) {
|
||||
return Ok(Some(value));
|
||||
return Ok(Some((value, schema.to_string())));
|
||||
}
|
||||
|
||||
// try all schemas
|
||||
@@ -199,7 +199,14 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
|
||||
|
||||
match ks.len() {
|
||||
0 => Ok(None),
|
||||
1 => Ok(Some(ks.remove(0).1)),
|
||||
1 => {
|
||||
let (key, value) = ks.remove(0);
|
||||
if let Some((key_schema, _)) = key.split_once("/") {
|
||||
Ok(Some((value, key_schema.to_string())))
|
||||
} else {
|
||||
Ok(Some((value, EMPTY_SCHEMA_NAME.to_string())))
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
debug!(
|
||||
"caches keys: {:?}, emp key: {:?}, schema key: {:?}, suffix key: {:?}",
|
||||
|
||||
@@ -205,7 +205,7 @@ impl PipelineOperator {
|
||||
name: &str,
|
||||
version: PipelineVersion,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<(String, TimestampNanosecond)> {
|
||||
) -> Result<((String, TimestampNanosecond), String)> {
|
||||
let schema = query_ctx.current_schema();
|
||||
self.create_pipeline_table_if_not_exists(query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
@@ -261,14 +261,14 @@ impl PipelineTable {
|
||||
version: PipelineVersion,
|
||||
) -> Result<Arc<Pipeline>> {
|
||||
if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
|
||||
return Ok(pipeline);
|
||||
return Ok(pipeline.0);
|
||||
}
|
||||
|
||||
let pipeline = self.get_pipeline_str(schema, name, version).await?;
|
||||
let (pipeline, found_schema) = self.get_pipeline_str(schema, name, version).await?;
|
||||
let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
|
||||
|
||||
self.cache.insert_pipeline_cache(
|
||||
schema,
|
||||
&found_schema,
|
||||
name,
|
||||
version,
|
||||
compiled_pipeline.clone(),
|
||||
@@ -284,7 +284,7 @@ impl PipelineTable {
|
||||
schema: &str,
|
||||
name: &str,
|
||||
input_version: PipelineVersion,
|
||||
) -> Result<(String, TimestampNanosecond)> {
|
||||
) -> Result<((String, TimestampNanosecond), String)> {
|
||||
if let Some(pipeline) = self
|
||||
.cache
|
||||
.get_pipeline_str_cache(schema, name, input_version)?
|
||||
@@ -345,7 +345,7 @@ impl PipelineTable {
|
||||
p.clone(),
|
||||
input_version.is_none(),
|
||||
);
|
||||
return Ok(p);
|
||||
return Ok((p, found_schema));
|
||||
}
|
||||
|
||||
// check if there's empty schema pipeline
|
||||
@@ -374,7 +374,7 @@ impl PipelineTable {
|
||||
p.clone(),
|
||||
input_version.is_none(),
|
||||
);
|
||||
Ok(p)
|
||||
Ok((p, found_schema.clone()))
|
||||
}
|
||||
|
||||
/// Insert a pipeline into the pipeline table and compile it.
|
||||
|
||||
Reference in New Issue
Block a user