feat: Add query pipeline http api (#5819)

* feat(pipeline): add query pipeline http api.

* chore(pipeline): rename get pipepile method

* refactor(pipeline): Also insert string piple  into cache after inserting into table.

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
This commit is contained in:
Lin Yihai
2025-04-16 18:17:20 +08:00
committed by GitHub
parent 55c9a0de42
commit 7274ceba30
10 changed files with 201 additions and 6 deletions

1
Cargo.lock generated
View File

@@ -12087,6 +12087,7 @@ dependencies = [
"tower 0.5.2",
"url",
"uuid",
"yaml-rust",
"zstd 0.13.2",
]

View File

@@ -19,6 +19,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use datatypes::timestamp::TimestampNanosecond;
use pipeline::pipeline_operator::PipelineOperator;
use pipeline::{Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{
@@ -103,6 +104,18 @@ impl PipelineHandler for Instance {
fn build_pipeline(&self, pipeline: &str) -> ServerResult<Pipeline> {
PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu)
}
async fn get_pipeline_str(
&self,
name: &str,
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> ServerResult<(String, TimestampNanosecond)> {
self.pipeline_operator
.get_pipeline_str(name, version, query_ctx)
.await
.context(PipelineSnafu)
}
}
impl Instance {

View File

@@ -20,6 +20,7 @@ use api::v1::CreateTableExpr;
use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_telemetry::info;
use datatypes::timestamp::TimestampNanosecond;
use futures::FutureExt;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
@@ -198,6 +199,29 @@ impl PipelineOperator {
.await
}
/// Get a original pipeline by name.
pub async fn get_pipeline_str(
&self,
name: &str,
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<(String, TimestampNanosecond)> {
let schema = query_ctx.current_schema();
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;
let timer = Instant::now();
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.get_pipeline_str(&schema, name, version)
.inspect(|re| {
METRIC_PIPELINE_RETRIEVE_HISTOGRAM
.with_label_values(&[&re.is_ok().to_string()])
.observe(timer.elapsed().as_secs_f64())
})
.await
}
/// Insert a pipeline into the pipeline table.
pub async fn insert_pipeline(
&self,

View File

@@ -69,6 +69,7 @@ pub struct PipelineTable {
table: TableRef,
query_engine: QueryEngineRef,
pipelines: Cache<String, Arc<Pipeline>>,
original_pipelines: Cache<String, (String, TimestampNanosecond)>,
}
impl PipelineTable {
@@ -88,6 +89,10 @@ impl PipelineTable {
.max_capacity(PIPELINES_CACHE_SIZE)
.time_to_live(PIPELINES_CACHE_TTL)
.build(),
original_pipelines: Cache::builder()
.max_capacity(PIPELINES_CACHE_SIZE)
.time_to_live(PIPELINES_CACHE_TTL)
.build(),
}
}
@@ -273,10 +278,7 @@ impl PipelineTable {
return Ok(pipeline);
}
let pipeline = self
.find_pipeline(schema, name, version)
.await?
.context(PipelineNotFoundSnafu { name, version })?;
let pipeline = self.get_pipeline_str(schema, name, version).await?;
let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
self.pipelines.insert(
@@ -286,6 +288,31 @@ impl PipelineTable {
Ok(compiled_pipeline)
}
/// Get a original pipeline by name.
/// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
pub async fn get_pipeline_str(
&self,
schema: &str,
name: &str,
version: PipelineVersion,
) -> Result<(String, TimestampNanosecond)> {
if let Some(pipeline) = self
.original_pipelines
.get(&generate_pipeline_cache_key(schema, name, version))
{
return Ok(pipeline);
}
let pipeline = self
.find_pipeline(schema, name, version)
.await?
.context(PipelineNotFoundSnafu { name, version })?;
self.original_pipelines.insert(
generate_pipeline_cache_key(schema, name, version),
pipeline.clone(),
);
Ok(pipeline)
}
/// Insert a pipeline into the pipeline table and compile it.
/// The compiled pipeline will be inserted into the cache.
pub async fn insert_and_compile(
@@ -310,6 +337,15 @@ impl PipelineTable {
generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))),
compiled_pipeline.clone(),
);
self.original_pipelines.insert(
generate_pipeline_cache_key(schema, name, None),
(pipeline.to_owned(), TimestampNanosecond(version)),
);
self.original_pipelines.insert(
generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))),
(pipeline.to_owned(), TimestampNanosecond(version)),
);
}
Ok((version, compiled_pipeline))
@@ -392,6 +428,10 @@ impl PipelineTable {
.remove(&generate_pipeline_cache_key(schema, name, version));
self.pipelines
.remove(&generate_pipeline_cache_key(schema, name, None));
self.original_pipelines
.remove(&generate_pipeline_cache_key(schema, name, version));
self.original_pipelines
.remove(&generate_pipeline_cache_key(schema, name, None));
Ok(Some(()))
}

View File

@@ -928,6 +928,10 @@ impl HttpServer {
fn route_log_deprecated<S>(log_state: LogState) -> Router<S> {
Router::new()
.route("/logs", routing::post(event::log_ingester))
.route(
"/pipelines/{pipeline_name}",
routing::get(event::query_pipeline),
)
.route(
"/pipelines/{pipeline_name}",
routing::post(event::add_pipeline),
@@ -947,6 +951,10 @@ impl HttpServer {
fn route_pipelines<S>(log_state: LogState) -> Router<S> {
Router::new()
.route("/ingest", routing::post(event::log_ingester))
.route(
"/pipelines/{pipeline_name}",
routing::get(event::query_pipeline),
)
.route(
"/pipelines/{pipeline_name}",
routing::post(event::add_pipeline),

View File

@@ -147,6 +147,41 @@ where
}
}
#[axum_macros::debug_handler]
pub async fn query_pipeline(
State(state): State<LogState>,
Extension(mut query_ctx): Extension<QueryContext>,
Query(query_params): Query<LogIngesterQueryParams>,
Path(pipeline_name): Path<String>,
) -> Result<GreptimedbManageResponse> {
let start = Instant::now();
let handler = state.log_handler;
ensure!(
!pipeline_name.is_empty(),
InvalidParameterSnafu {
reason: "pipeline_name is required in path",
}
);
let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);
let (pipeline, pipeline_version) = handler
.get_pipeline_str(&pipeline_name, version, query_ctx)
.await?;
Ok(GreptimedbManageResponse::from_pipeline(
pipeline_name,
query_params
.version
.unwrap_or(pipeline_version.0.to_iso8601_string()),
start.elapsed().as_millis() as u64,
Some(pipeline),
))
}
#[axum_macros::debug_handler]
pub async fn add_pipeline(
State(state): State<LogState>,
@@ -189,6 +224,7 @@ pub async fn add_pipeline(
pipeline_name,
pipeline.0.to_timezone_aware_string(None),
start.elapsed().as_millis() as u64,
None,
)
})
.map_err(|e| {
@@ -231,6 +267,7 @@ pub async fn delete_pipeline(
pipeline_name,
version_str,
start.elapsed().as_millis() as u64,
None,
)
} else {
GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)

View File

@@ -30,10 +30,19 @@ pub struct GreptimedbManageResponse {
}
impl GreptimedbManageResponse {
pub fn from_pipeline(name: String, version: String, execution_time_ms: u64) -> Self {
pub fn from_pipeline(
name: String,
version: String,
execution_time_ms: u64,
pipeline: Option<String>,
) -> Self {
GreptimedbManageResponse {
manage_result: ManageResult::Pipelines {
pipelines: vec![PipelineOutput { name, version }],
pipelines: vec![PipelineOutput {
name,
version,
pipeline,
}],
},
execution_time_ms,
}
@@ -68,6 +77,8 @@ pub enum ManageResult {
pub struct PipelineOutput {
name: String,
version: String,
#[serde(skip_serializing_if = "Option::is_none")]
pipeline: Option<String>,
}
impl IntoResponse for GreptimedbManageResponse {
@@ -109,6 +120,7 @@ mod tests {
pipelines: vec![PipelineOutput {
name: "test_name".to_string(),
version: "test_version".to_string(),
pipeline: None,
}],
},
execution_time_ms: 42,

View File

@@ -34,6 +34,7 @@ use api::v1::RowInsertRequests;
use async_trait::async_trait;
use catalog::CatalogManager;
use common_query::Output;
use datatypes::timestamp::TimestampNanosecond;
use headers::HeaderValue;
use log_query::LogQuery;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
@@ -165,6 +166,14 @@ pub trait PipelineHandler {
//// Build a pipeline from a string.
fn build_pipeline(&self, pipeline: &str) -> Result<Pipeline>;
/// Get a original pipeline by name.
async fn get_pipeline_str(
&self,
name: &str,
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<(String, TimestampNanosecond)>;
}
/// Handle log query requests.

View File

@@ -100,3 +100,4 @@ session = { workspace = true, features = ["testing"] }
store-api.workspace = true
tokio-postgres = { workspace = true }
url = "2.3"
yaml-rust = "0.4"

View File

@@ -50,6 +50,7 @@ use tests_integration::test_util::{
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
StorageType,
};
use yaml_rust::YamlLoader;
#[macro_export]
macro_rules! http_test {
@@ -1348,6 +1349,55 @@ transform:
.as_str()
.unwrap()
.to_string();
let encoded_ver_str: String =
url::form_urlencoded::byte_serialize(version_str.as_bytes()).collect();
// get pipeline
let res = client
.get(format!("/v1/pipelines/test?version={}", encoded_ver_str).as_str())
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let content = res.text().await;
let content = serde_json::from_str(&content);
let content: Value = content.unwrap();
let pipeline_yaml = content
.get("pipelines")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.get("pipeline")
.unwrap()
.as_str()
.unwrap();
let docs = YamlLoader::load_from_str(pipeline_yaml).unwrap();
let body_yaml = YamlLoader::load_from_str(body).unwrap();
assert_eq!(docs, body_yaml);
// Do not specify version, get the latest version
let res = client.get("/v1/pipelines/test").send().await;
assert_eq!(res.status(), StatusCode::OK);
let content = res.text().await;
let content = serde_json::from_str(&content);
let content: Value = content.unwrap();
let pipeline_yaml = content
.get("pipelines")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.get("pipeline")
.unwrap()
.as_str()
.unwrap();
let docs = YamlLoader::load_from_str(pipeline_yaml).unwrap();
assert_eq!(docs, body_yaml);
// 2. write data
let data_body = r#"