feat: support multivector on remote table (#2045)

Signed-off-by: BubbleCal <bubble-cal@outlook.com>
This commit is contained in:
BubbleCal
2025-02-20 11:34:51 +08:00
committed by GitHub
parent 1865f7decf
commit 14c9ff46d1
3 changed files with 155 additions and 258 deletions

View File

@@ -10,7 +10,7 @@ use crate::index::IndexStatistics;
use crate::query::{QueryRequest, Select, VectorQueryRequest};
use crate::table::{AddDataMode, AnyQuery, Filter};
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
use crate::{DistanceType, Error, Table};
use crate::{DistanceType, Error};
use arrow_array::RecordBatchReader;
use arrow_ipc::reader::FileReader;
use arrow_schema::{DataType, SchemaRef};
@@ -24,7 +24,7 @@ use http::StatusCode;
use lance::arrow::json::{JsonDataType, JsonSchema};
use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::{ColumnAlteration, NewColumnTransform, Version};
use lance_datafusion::exec::{execute_plan, OneShotExec};
use lance_datafusion::exec::OneShotExec;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
@@ -205,10 +205,10 @@ impl<S: HttpSend> RemoteTable<S> {
}
fn apply_vector_query_params(
mut body: serde_json::Value,
body: &mut serde_json::Value,
query: &VectorQueryRequest,
) -> Result<Vec<serde_json::Value>> {
Self::apply_query_params(&mut body, &query.base)?;
) -> Result<()> {
Self::apply_query_params(body, &query.base)?;
// Apply general parameters, before we dispatch based on number of query vectors.
body["prefilter"] = query.base.prefilter.into();
@@ -254,22 +254,21 @@ impl<S: HttpSend> RemoteTable<S> {
0 => {
// Server takes empty vector, not null or undefined.
body["vector"] = serde_json::Value::Array(Vec::new());
Ok(vec![body])
}
1 => {
body["vector"] = vector_to_json(&query.query_vector[0])?;
Ok(vec![body])
}
_ => {
let mut bodies = Vec::with_capacity(query.query_vector.len());
for vector in &query.query_vector {
let mut body = body.clone();
body["vector"] = vector_to_json(vector)?;
bodies.push(body);
}
Ok(bodies)
let vectors = query
.query_vector
.iter()
.map(vector_to_json)
.collect::<Result<Vec<_>>>()?;
body["vector"] = serde_json::Value::Array(vectors);
}
}
Ok(())
}
async fn check_mutable(&self) -> Result<()> {
@@ -294,7 +293,7 @@ impl<S: HttpSend> RemoteTable<S> {
&self,
query: &AnyQuery,
_options: QueryExecutionOptions,
) -> Result<Vec<Pin<Box<dyn RecordBatchStream + Send>>>> {
) -> Result<Pin<Box<dyn RecordBatchStream + Send>>> {
let request = self.client.post(&format!("/v1/table/{}/query/", self.name));
let version = self.current_version().await;
@@ -305,28 +304,16 @@ impl<S: HttpSend> RemoteTable<S> {
Self::apply_query_params(&mut body, query)?;
// Empty vector can be passed if no vector search is performed.
body["vector"] = serde_json::Value::Array(Vec::new());
let request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let stream = self.read_arrow_stream(&request_id, response).await?;
Ok(vec![stream])
}
AnyQuery::VectorQuery(query) => {
let bodies = Self::apply_vector_query_params(body, query)?;
let mut futures = Vec::with_capacity(bodies.len());
for body in bodies {
let request = request.try_clone().unwrap().json(&body);
let future = async move {
let (request_id, response) = self.client.send(request, true).await?;
self.read_arrow_stream(&request_id, response).await
};
futures.push(future);
}
futures::future::try_join_all(futures).await
Self::apply_vector_query_params(&mut body, query)?;
}
}
let request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let stream = self.read_arrow_stream(&request_id, response).await?;
Ok(stream)
}
}
@@ -498,18 +485,8 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let streams = self.execute_query(query, options).await?;
if streams.len() == 1 {
let stream = streams.into_iter().next().unwrap();
Ok(Arc::new(OneShotExec::new(stream)))
} else {
let stream_execs = streams
.into_iter()
.map(|stream| Arc::new(OneShotExec::new(stream)) as Arc<dyn ExecutionPlan>)
.collect();
Table::multi_vector_plan(stream_execs)
}
let stream = self.execute_query(query, options).await?;
Ok(Arc::new(OneShotExec::new(stream)))
}
async fn query(
@@ -517,24 +494,8 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
query: &AnyQuery,
_options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream> {
let streams = self.execute_query(query, _options).await?;
if streams.len() == 1 {
Ok(DatasetRecordBatchStream::new(
streams.into_iter().next().unwrap(),
))
} else {
let stream_execs = streams
.into_iter()
.map(|stream| Arc::new(OneShotExec::new(stream)) as Arc<dyn ExecutionPlan>)
.collect();
let plan = Table::multi_vector_plan(stream_execs)?;
Ok(DatasetRecordBatchStream::new(execute_plan(
plan,
Default::default(),
)?))
}
let stream = self.execute_query(query, _options).await?;
Ok(DatasetRecordBatchStream::new(stream))
}
async fn update(&self, update: UpdateBuilder) -> Result<u64> {
self.check_mutable().await?;
@@ -1500,9 +1461,21 @@ mod tests {
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body: serde_json::Value =
serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap();
let query_vectors = body["vector"].as_array().unwrap();
assert_eq!(query_vectors.len(), 2);
assert_eq!(query_vectors[0].as_array().unwrap().len(), 3);
assert_eq!(query_vectors[1].as_array().unwrap().len(), 3);
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("query_index", DataType::Int32, false),
])),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])),
Arc::new(Int32Array::from(vec![0, 0, 0, 1, 1, 1])),
],
)
.unwrap();
let response_body = write_ipc_file(&data);
@@ -1519,8 +1492,6 @@ mod tests {
.unwrap()
.add_query_vector(vec![0.4, 0.5, 0.6])
.unwrap();
let plan = query.explain_plan(true).await.unwrap();
assert!(plan.contains("UnionExec"), "Plan: {}", plan);
let results = query
.execute()

View File

@@ -1996,8 +1996,8 @@ impl BaseTable for NativeTable {
};
let ds_ref = self.dataset.get().await?;
let mut column = query.column.clone();
let schema = ds_ref.schema();
let mut column = query.column.clone();
let mut query_vector = query.query_vector.first().cloned();
if query.query_vector.len() > 1 {