feat: add explain plan remote api (#2263)

Add explain plan remote api
This commit is contained in:
LuQQiu
2025-03-26 11:22:40 -07:00
committed by GitHub
parent 79fa745130
commit 698f329598
7 changed files with 235 additions and 138 deletions

View File

@@ -325,24 +325,11 @@ impl<S: HttpSend> RemoteTable<S> {
) -> Result<Vec<Pin<Box<dyn RecordBatchStream + Send>>>> {
let request = self.client.post(&format!("/v1/table/{}/query/", self.name));
let version = self.current_version().await;
let mut body = serde_json::json!({ "version": version });
let requests = match query {
AnyQuery::Query(query) => {
Self::apply_query_params(&mut body, query)?;
// Empty vector can be passed if no vector search is performed.
body["vector"] = serde_json::Value::Array(Vec::new());
vec![request.json(&body)]
}
AnyQuery::VectorQuery(query) => {
let bodies = self.apply_vector_query_params(body, query)?;
bodies
.into_iter()
.map(|body| request.try_clone().unwrap().json(&body))
.collect()
}
};
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
.into_iter()
.map(|body| request.try_clone().unwrap().json(&body))
.collect();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
@@ -351,6 +338,22 @@ impl<S: HttpSend> RemoteTable<S> {
let streams = futures::future::try_join_all(futures).await?;
Ok(streams)
}
async fn prepare_query_bodies(&self, query: &AnyQuery) -> Result<Vec<serde_json::Value>> {
let version = self.current_version().await;
let base_body = serde_json::json!({ "version": version });
match query {
AnyQuery::Query(query) => {
let mut body = base_body.clone();
Self::apply_query_params(&mut body, query)?;
// Empty vector can be passed if no vector search is performed.
body["vector"] = serde_json::Value::Array(Vec::new());
Ok(vec![body])
}
AnyQuery::VectorQuery(query) => self.apply_vector_query_params(base_body, query),
}
}
}
#[derive(Deserialize)]
@@ -559,6 +562,52 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
)?))
}
}
async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
let base_request = self
.client
.post(&format!("/v1/table/{}/explain_plan/", self.name));
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
.into_iter()
.map(|query_body| {
let explain_request = serde_json::json!({
"verbose": verbose,
"query": query_body
});
base_request.try_clone().unwrap().json(&explain_request)
})
.collect::<Vec<_>>();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse explain plan: {}", e).into(),
request_id,
status_code: None,
})
});
let plan_texts = futures::future::try_join_all(futures).await?;
let final_plan = if plan_texts.len() > 1 {
plan_texts
.into_iter()
.enumerate()
.map(|(i, plan)| format!("--- Plan #{} ---\n{}", i + 1, plan))
.collect::<Vec<_>>()
.join("\n\n")
} else {
plan_texts.into_iter().next().unwrap_or_default()
};
Ok(final_plan)
}
async fn update(&self, update: UpdateBuilder) -> Result<u64> {
self.check_mutable().await?;
let request = self
@@ -581,6 +630,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
Ok(0) // TODO: support returning number of modified rows once supported in SaaS.
}
async fn delete(&self, predicate: &str) -> Result<()> {
self.check_mutable().await?;
let body = serde_json::json!({ "predicate": predicate });