mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-22 21:09:58 +00:00
feat: support namespace server side query (#2811)
Currently a table in a namespace is still backed with a `NativeTable`, which means after getting the location of the table and optional storage options override from `namespace.describe_table`, all things work like a normal local table. However, namespace also supports `query_table`, which is exactly the same API as remote table. This PR adds a `server_side_query` capability, when enabled, it runs the query by calling `namespace.query_table`. For namespace that implements the operation (e.g. REST namespace), this could hit a backend server that could execute the query faster (e.g. using a distributed engine).
This commit is contained in:
@@ -408,6 +408,7 @@ impl OpenTableBuilder {
|
||||
index_cache_size: None,
|
||||
lance_read_params: None,
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
},
|
||||
embedding_registry,
|
||||
}
|
||||
@@ -1086,6 +1087,7 @@ pub struct ConnectNamespaceBuilder {
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
server_side_query_enabled: bool,
|
||||
}
|
||||
|
||||
impl ConnectNamespaceBuilder {
|
||||
@@ -1097,6 +1099,7 @@ impl ConnectNamespaceBuilder {
|
||||
read_consistency_interval: None,
|
||||
embedding_registry: None,
|
||||
session: None,
|
||||
server_side_query_enabled: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1151,6 +1154,18 @@ impl ConnectNamespaceBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Enable server-side query execution.
|
||||
///
|
||||
/// When enabled, queries will be executed on the namespace server instead of
|
||||
/// locally. This can improve performance by reducing data transfer and
|
||||
/// leveraging server-side compute resources.
|
||||
///
|
||||
/// Default is `false` (queries executed locally).
|
||||
pub fn server_side_query(mut self, enabled: bool) -> Self {
|
||||
self.server_side_query_enabled = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// Execute the connection
|
||||
pub async fn execute(self) -> Result<Connection> {
|
||||
use crate::database::namespace::LanceNamespaceDatabase;
|
||||
@@ -1162,6 +1177,7 @@ impl ConnectNamespaceBuilder {
|
||||
self.storage_options,
|
||||
self.read_consistency_interval,
|
||||
self.session,
|
||||
self.server_side_query_enabled,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
@@ -24,6 +24,7 @@ use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
|
||||
use futures::stream;
|
||||
use lance::dataset::ReadParams;
|
||||
use lance_datafusion::utils::StreamingWriteSource;
|
||||
use lance_namespace::LanceNamespace;
|
||||
|
||||
use crate::arrow::{SendableRecordBatchStream, SendableRecordBatchStreamExt};
|
||||
use crate::error::Result;
|
||||
@@ -77,7 +78,7 @@ pub struct TableNamesRequest {
|
||||
}
|
||||
|
||||
/// A request to open a table
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone)]
|
||||
pub struct OpenTableRequest {
|
||||
pub name: String,
|
||||
/// The namespace to open the table from. Empty list represents root namespace.
|
||||
@@ -87,6 +88,22 @@ pub struct OpenTableRequest {
|
||||
/// Optional custom location for the table. If not provided, the database will
|
||||
/// derive a location based on its URI and the table name.
|
||||
pub location: Option<String>,
|
||||
/// Optional namespace client for server-side query execution.
|
||||
/// When set, queries will be executed on the namespace server instead of locally.
|
||||
pub namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for OpenTableRequest {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("OpenTableRequest")
|
||||
.field("name", &self.name)
|
||||
.field("namespace", &self.namespace)
|
||||
.field("index_cache_size", &self.index_cache_size)
|
||||
.field("lance_read_params", &self.lance_read_params)
|
||||
.field("location", &self.location)
|
||||
.field("namespace_client", &self.namespace_client)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub type TableBuilderCallback = Box<dyn FnOnce(OpenTableRequest) -> OpenTableRequest + Send>;
|
||||
@@ -170,6 +187,9 @@ pub struct CreateTableRequest {
|
||||
/// Optional custom location for the table. If not provided, the database will
|
||||
/// derive a location based on its URI and the table name.
|
||||
pub location: Option<String>,
|
||||
/// Optional namespace client for server-side query execution.
|
||||
/// When set, queries will be executed on the namespace server instead of locally.
|
||||
pub namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
}
|
||||
|
||||
impl CreateTableRequest {
|
||||
@@ -181,6 +201,7 @@ impl CreateTableRequest {
|
||||
mode: CreateTableMode::default(),
|
||||
write_options: WriteOptions::default(),
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -641,6 +641,7 @@ impl ListingDatabase {
|
||||
index_cache_size: None,
|
||||
lance_read_params: None,
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
};
|
||||
let req = (callback)(req);
|
||||
let table = self.open_table(req).await?;
|
||||
@@ -768,6 +769,7 @@ impl Database for ListingDatabase {
|
||||
self.store_wrapper.clone(),
|
||||
Some(write_params),
|
||||
self.read_consistency_interval,
|
||||
request.namespace_client,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -839,6 +841,7 @@ impl Database for ListingDatabase {
|
||||
self.store_wrapper.clone(),
|
||||
None,
|
||||
self.read_consistency_interval,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -910,6 +913,7 @@ impl Database for ListingDatabase {
|
||||
self.store_wrapper.clone(),
|
||||
Some(read_params),
|
||||
self.read_consistency_interval,
|
||||
request.namespace_client,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
@@ -1011,6 +1015,7 @@ mod tests {
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1075,6 +1080,7 @@ mod tests {
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1133,6 +1139,7 @@ mod tests {
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1168,6 +1175,7 @@ mod tests {
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1207,6 +1215,7 @@ mod tests {
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1246,6 +1255,7 @@ mod tests {
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1300,6 +1310,7 @@ mod tests {
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1357,6 +1368,7 @@ mod tests {
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1442,6 +1454,7 @@ mod tests {
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1528,6 +1541,7 @@ mod tests {
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1621,6 +1635,7 @@ mod tests {
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -40,6 +40,8 @@ pub struct LanceNamespaceDatabase {
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
// database URI
|
||||
uri: String,
|
||||
// Whether to enable server-side query execution
|
||||
server_side_query_enabled: bool,
|
||||
}
|
||||
|
||||
impl LanceNamespaceDatabase {
|
||||
@@ -49,6 +51,7 @@ impl LanceNamespaceDatabase {
|
||||
storage_options: HashMap<String, String>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
server_side_query_enabled: bool,
|
||||
) -> Result<Self> {
|
||||
let mut builder = ConnectBuilder::new(ns_impl);
|
||||
for (key, value) in ns_properties.clone() {
|
||||
@@ -67,6 +70,7 @@ impl LanceNamespaceDatabase {
|
||||
read_consistency_interval,
|
||||
session,
|
||||
uri: format!("namespace://{}", ns_impl),
|
||||
server_side_query_enabled,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -76,6 +80,7 @@ impl std::fmt::Debug for LanceNamespaceDatabase {
|
||||
f.debug_struct("LanceNamespaceDatabase")
|
||||
.field("storage_options", &self.storage_options)
|
||||
.field("read_consistency_interval", &self.read_consistency_interval)
|
||||
.field("server_side_query_enabled", &self.server_side_query_enabled)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -290,6 +295,10 @@ impl Database for LanceNamespaceDatabase {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let namespace_client = self
|
||||
.server_side_query_enabled
|
||||
.then(|| self.namespace.clone());
|
||||
|
||||
return listing_db
|
||||
.open_table(OpenTableRequest {
|
||||
name: request.name.clone(),
|
||||
@@ -297,6 +306,7 @@ impl Database for LanceNamespaceDatabase {
|
||||
index_cache_size: None,
|
||||
lance_read_params: None,
|
||||
location: Some(location),
|
||||
namespace_client,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
@@ -333,12 +343,16 @@ impl Database for LanceNamespaceDatabase {
|
||||
let listing_db = self
|
||||
.create_listing_database(
|
||||
&location,
|
||||
table_id,
|
||||
table_id.clone(),
|
||||
user_storage_options,
|
||||
create_empty_response.storage_options.as_ref(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let namespace_client = self
|
||||
.server_side_query_enabled
|
||||
.then(|| self.namespace.clone());
|
||||
|
||||
let create_request = DbCreateTableRequest {
|
||||
name: request.name,
|
||||
namespace: request.namespace,
|
||||
@@ -346,7 +360,9 @@ impl Database for LanceNamespaceDatabase {
|
||||
mode: request.mode,
|
||||
write_options: request.write_options,
|
||||
location: Some(location),
|
||||
namespace_client,
|
||||
};
|
||||
|
||||
listing_db.create_table(create_request).await
|
||||
}
|
||||
|
||||
@@ -380,19 +396,25 @@ impl Database for LanceNamespaceDatabase {
|
||||
let listing_db = self
|
||||
.create_listing_database(
|
||||
&location,
|
||||
table_id,
|
||||
table_id.clone(),
|
||||
user_storage_options,
|
||||
response.storage_options.as_ref(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let namespace_client = self
|
||||
.server_side_query_enabled
|
||||
.then(|| self.namespace.clone());
|
||||
|
||||
let open_request = OpenTableRequest {
|
||||
name: request.name.clone(),
|
||||
namespace: request.namespace.clone(),
|
||||
index_cache_size: request.index_cache_size,
|
||||
lance_read_params: request.lance_read_params,
|
||||
location: Some(location),
|
||||
namespace_client,
|
||||
};
|
||||
|
||||
listing_db.open_table(open_request).await
|
||||
}
|
||||
|
||||
|
||||
@@ -417,6 +417,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
|
||||
index_cache_size: None,
|
||||
lance_read_params: None,
|
||||
location: None,
|
||||
namespace_client: None,
|
||||
};
|
||||
let req = (callback)(req);
|
||||
self.open_table(req).await
|
||||
|
||||
@@ -40,6 +40,11 @@ use lance_index::vector::pq::PQBuildParams;
|
||||
use lance_index::vector::sq::builder::SQBuildParams;
|
||||
use lance_index::DatasetIndexExt;
|
||||
use lance_index::IndexType;
|
||||
use lance_namespace::models::{
|
||||
QueryTableRequest as NsQueryTableRequest, QueryTableRequestFullTextQuery,
|
||||
QueryTableRequestVector, StringFtsQuery,
|
||||
};
|
||||
use lance_namespace::LanceNamespace;
|
||||
use lance_table::format::Manifest;
|
||||
use lance_table::io::commit::ManifestNamingScheme;
|
||||
use log::info;
|
||||
@@ -1480,7 +1485,7 @@ impl NativeTableExt for Arc<dyn BaseTable> {
|
||||
}
|
||||
|
||||
/// A table in a LanceDB database.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Clone)]
|
||||
pub struct NativeTable {
|
||||
name: String,
|
||||
namespace: Vec<String>,
|
||||
@@ -1490,6 +1495,22 @@ pub struct NativeTable {
|
||||
// This comes from the connection options. We store here so we can pass down
|
||||
// to the dataset when we recreate it (for example, in checkout_latest).
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
// Optional namespace client for server-side query execution.
|
||||
// When set, queries will be executed on the namespace server instead of locally.
|
||||
namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for NativeTable {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("NativeTable")
|
||||
.field("name", &self.name)
|
||||
.field("namespace", &self.namespace)
|
||||
.field("id", &self.id)
|
||||
.field("uri", &self.uri)
|
||||
.field("read_consistency_interval", &self.read_consistency_interval)
|
||||
.field("namespace_client", &self.namespace_client)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for NativeTable {
|
||||
@@ -1524,7 +1545,7 @@ impl NativeTable {
|
||||
/// * A [NativeTable] object.
|
||||
pub async fn open(uri: &str) -> Result<Self> {
|
||||
let name = Self::get_table_name(uri)?;
|
||||
Self::open_with_params(uri, &name, vec![], None, None, None).await
|
||||
Self::open_with_params(uri, &name, vec![], None, None, None, None).await
|
||||
}
|
||||
|
||||
/// Opens an existing Table
|
||||
@@ -1534,10 +1555,12 @@ impl NativeTable {
|
||||
/// * `base_path` - The base path where the table is located
|
||||
/// * `name` The Table name
|
||||
/// * `params` The [ReadParams] to use when opening the table
|
||||
/// * `namespace_client` - Optional namespace client for server-side query execution
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * A [NativeTable] object.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn open_with_params(
|
||||
uri: &str,
|
||||
name: &str,
|
||||
@@ -1545,6 +1568,7 @@ impl NativeTable {
|
||||
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||
params: Option<ReadParams>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
) -> Result<Self> {
|
||||
let params = params.unwrap_or_default();
|
||||
// patch the params if we have a write store wrapper
|
||||
@@ -1575,9 +1599,18 @@ impl NativeTable {
|
||||
uri: uri.to_string(),
|
||||
dataset,
|
||||
read_consistency_interval,
|
||||
namespace_client,
|
||||
})
|
||||
}
|
||||
|
||||
/// Set the namespace client for server-side query execution.
|
||||
///
|
||||
/// When set, queries will be executed on the namespace server instead of locally.
|
||||
pub fn with_namespace_client(mut self, namespace_client: Arc<dyn LanceNamespace>) -> Self {
|
||||
self.namespace_client = Some(namespace_client);
|
||||
self
|
||||
}
|
||||
|
||||
fn get_table_name(uri: &str) -> Result<String> {
|
||||
let path = Path::new(uri);
|
||||
let name = path
|
||||
@@ -1614,10 +1647,12 @@ impl NativeTable {
|
||||
/// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided.
|
||||
/// * `batches` RecordBatch to be saved in the database.
|
||||
/// * `params` - Write parameters.
|
||||
/// * `namespace_client` - Optional namespace client for server-side query execution
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * A [TableImpl] object.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn create(
|
||||
uri: &str,
|
||||
name: &str,
|
||||
@@ -1626,6 +1661,7 @@ impl NativeTable {
|
||||
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||
params: Option<WriteParams>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
) -> Result<Self> {
|
||||
// Default params uses format v1.
|
||||
let params = params.unwrap_or(WriteParams {
|
||||
@@ -1657,9 +1693,11 @@ impl NativeTable {
|
||||
uri: uri.to_string(),
|
||||
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
|
||||
read_consistency_interval,
|
||||
namespace_client,
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn create_empty(
|
||||
uri: &str,
|
||||
name: &str,
|
||||
@@ -1668,6 +1706,7 @@ impl NativeTable {
|
||||
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||
params: Option<WriteParams>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
namespace_client: Option<Arc<dyn LanceNamespace>>,
|
||||
) -> Result<Self> {
|
||||
let batches = RecordBatchIterator::new(vec![], schema);
|
||||
Self::create(
|
||||
@@ -1678,6 +1717,7 @@ impl NativeTable {
|
||||
write_store_wrapper,
|
||||
params,
|
||||
read_consistency_interval,
|
||||
namespace_client,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -2035,6 +2075,278 @@ impl NativeTable {
|
||||
Ok(DatasetRecordBatchStream::new(inner))
|
||||
}
|
||||
|
||||
/// Execute a query on the namespace server instead of locally.
|
||||
async fn namespace_query(
|
||||
&self,
|
||||
namespace_client: Arc<dyn LanceNamespace>,
|
||||
query: &AnyQuery,
|
||||
_options: QueryExecutionOptions,
|
||||
) -> Result<DatasetRecordBatchStream> {
|
||||
// Build table_id from namespace + table name
|
||||
let mut table_id = self.namespace.clone();
|
||||
table_id.push(self.name.clone());
|
||||
|
||||
// Convert AnyQuery to namespace QueryTableRequest
|
||||
let mut ns_request = self.convert_to_namespace_query(query)?;
|
||||
// Set the table ID on the request
|
||||
ns_request.id = Some(table_id);
|
||||
|
||||
// Call the namespace query_table API
|
||||
let response_bytes = namespace_client
|
||||
.query_table(ns_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to execute server-side query: {}", e),
|
||||
})?;
|
||||
|
||||
// Parse the Arrow IPC response into a RecordBatchStream
|
||||
self.parse_arrow_ipc_response(response_bytes).await
|
||||
}
|
||||
|
||||
/// Convert a QueryFilter to a SQL string for the namespace API.
|
||||
fn filter_to_sql(&self, filter: &QueryFilter) -> Result<String> {
|
||||
match filter {
|
||||
QueryFilter::Sql(sql) => Ok(sql.clone()),
|
||||
QueryFilter::Substrait(_) => Err(Error::NotSupported {
|
||||
message: "Substrait filters are not supported for server-side queries".to_string(),
|
||||
}),
|
||||
QueryFilter::Datafusion(_) => Err(Error::NotSupported {
|
||||
message: "Datafusion expression filters are not supported for server-side queries. Use SQL filter instead.".to_string(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert an AnyQuery to the namespace QueryTableRequest format.
|
||||
fn convert_to_namespace_query(&self, query: &AnyQuery) -> Result<NsQueryTableRequest> {
|
||||
match query {
|
||||
AnyQuery::VectorQuery(vq) => {
|
||||
// Extract the query vector(s)
|
||||
let vector = self.extract_query_vector(&vq.query_vector)?;
|
||||
|
||||
// Convert filter to SQL string
|
||||
let filter = match &vq.base.filter {
|
||||
Some(f) => Some(self.filter_to_sql(f)?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
// Convert select to columns list
|
||||
let columns = match &vq.base.select {
|
||||
Select::All => None,
|
||||
Select::Columns(cols) => Some(cols.clone()),
|
||||
Select::Dynamic(_) => {
|
||||
return Err(Error::NotSupported {
|
||||
message:
|
||||
"Dynamic column selection is not supported for server-side queries"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// Check for unsupported features
|
||||
if vq.base.reranker.is_some() {
|
||||
return Err(Error::NotSupported {
|
||||
message: "Reranker is not supported for server-side queries".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Convert FTS query if present
|
||||
let full_text_query = vq.base.full_text_search.as_ref().map(|fts| {
|
||||
let columns = fts.columns();
|
||||
let columns_vec = if columns.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(columns.into_iter().collect())
|
||||
};
|
||||
Box::new(QueryTableRequestFullTextQuery {
|
||||
string_query: Some(Box::new(StringFtsQuery {
|
||||
query: fts.query.to_string(),
|
||||
columns: columns_vec,
|
||||
})),
|
||||
structured_query: None,
|
||||
})
|
||||
});
|
||||
|
||||
Ok(NsQueryTableRequest {
|
||||
id: None, // Will be set in namespace_query
|
||||
k: vq.base.limit.unwrap_or(10) as i32,
|
||||
vector: Box::new(vector),
|
||||
vector_column: vq.column.clone(),
|
||||
filter,
|
||||
columns,
|
||||
offset: vq.base.offset.map(|o| o as i32),
|
||||
distance_type: vq.distance_type.map(|dt| dt.to_string()),
|
||||
nprobes: Some(vq.minimum_nprobes as i32),
|
||||
ef: vq.ef.map(|e| e as i32),
|
||||
refine_factor: vq.refine_factor.map(|r| r as i32),
|
||||
lower_bound: vq.lower_bound,
|
||||
upper_bound: vq.upper_bound,
|
||||
prefilter: Some(vq.base.prefilter),
|
||||
fast_search: Some(vq.base.fast_search),
|
||||
with_row_id: Some(vq.base.with_row_id),
|
||||
bypass_vector_index: Some(!vq.use_index),
|
||||
full_text_query,
|
||||
version: None,
|
||||
})
|
||||
}
|
||||
AnyQuery::Query(q) => {
|
||||
// For non-vector queries, pass an empty vector (similar to remote table implementation)
|
||||
if q.reranker.is_some() {
|
||||
return Err(Error::NotSupported {
|
||||
message: "Reranker is not supported for server-side query execution"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let filter = q
|
||||
.filter
|
||||
.as_ref()
|
||||
.map(|f| self.filter_to_sql(f))
|
||||
.transpose()?;
|
||||
|
||||
let columns = match &q.select {
|
||||
Select::All => None,
|
||||
Select::Columns(cols) => Some(cols.clone()),
|
||||
Select::Dynamic(_) => {
|
||||
return Err(Error::NotSupported {
|
||||
message: "Dynamic columns are not supported for server-side query"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// Handle full text search if present
|
||||
let full_text_query = q.full_text_search.as_ref().map(|fts| {
|
||||
let columns_vec = if fts.columns().is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(fts.columns().iter().cloned().collect())
|
||||
};
|
||||
Box::new(QueryTableRequestFullTextQuery {
|
||||
string_query: Some(Box::new(StringFtsQuery {
|
||||
query: fts.query.to_string(),
|
||||
columns: columns_vec,
|
||||
})),
|
||||
structured_query: None,
|
||||
})
|
||||
});
|
||||
|
||||
// Empty vector for non-vector queries
|
||||
let vector = Box::new(QueryTableRequestVector {
|
||||
single_vector: Some(vec![]),
|
||||
multi_vector: None,
|
||||
});
|
||||
|
||||
Ok(NsQueryTableRequest {
|
||||
id: None, // Will be set by caller
|
||||
vector,
|
||||
k: q.limit.unwrap_or(10) as i32,
|
||||
filter,
|
||||
columns,
|
||||
prefilter: Some(q.prefilter),
|
||||
offset: q.offset.map(|o| o as i32),
|
||||
ef: None,
|
||||
refine_factor: None,
|
||||
distance_type: None,
|
||||
nprobes: None,
|
||||
vector_column: None, // No vector column for plain queries
|
||||
with_row_id: Some(q.with_row_id),
|
||||
bypass_vector_index: Some(true), // No vector index for plain queries
|
||||
full_text_query,
|
||||
version: None,
|
||||
fast_search: None,
|
||||
lower_bound: None,
|
||||
upper_bound: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract query vector(s) from Arrow arrays into the namespace format.
|
||||
fn extract_query_vector(
|
||||
&self,
|
||||
query_vectors: &[Arc<dyn arrow_array::Array>],
|
||||
) -> Result<QueryTableRequestVector> {
|
||||
if query_vectors.is_empty() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "Query vector is required for vector search".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Handle single vector case
|
||||
if query_vectors.len() == 1 {
|
||||
let arr = &query_vectors[0];
|
||||
let single_vector = self.array_to_f32_vec(arr)?;
|
||||
Ok(QueryTableRequestVector {
|
||||
single_vector: Some(single_vector),
|
||||
multi_vector: None,
|
||||
})
|
||||
} else {
|
||||
// Handle multi-vector case
|
||||
let multi_vector: Result<Vec<Vec<f32>>> = query_vectors
|
||||
.iter()
|
||||
.map(|arr| self.array_to_f32_vec(arr))
|
||||
.collect();
|
||||
Ok(QueryTableRequestVector {
|
||||
single_vector: None,
|
||||
multi_vector: Some(multi_vector?),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert an Arrow array to a Vec<f32>.
|
||||
fn array_to_f32_vec(&self, arr: &Arc<dyn arrow_array::Array>) -> Result<Vec<f32>> {
|
||||
// Handle FixedSizeList (common for vectors)
|
||||
if let Some(fsl) = arr
|
||||
.as_any()
|
||||
.downcast_ref::<arrow_array::FixedSizeListArray>()
|
||||
{
|
||||
let values = fsl.values();
|
||||
if let Some(f32_arr) = values.as_any().downcast_ref::<arrow_array::Float32Array>() {
|
||||
return Ok(f32_arr.values().to_vec());
|
||||
}
|
||||
}
|
||||
|
||||
// Handle direct Float32Array
|
||||
if let Some(f32_arr) = arr.as_any().downcast_ref::<arrow_array::Float32Array>() {
|
||||
return Ok(f32_arr.values().to_vec());
|
||||
}
|
||||
|
||||
Err(Error::InvalidInput {
|
||||
message: "Query vector must be Float32 type".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Parse Arrow IPC response from the namespace server.
|
||||
async fn parse_arrow_ipc_response(
|
||||
&self,
|
||||
bytes: bytes::Bytes,
|
||||
) -> Result<DatasetRecordBatchStream> {
|
||||
use arrow_ipc::reader::StreamReader;
|
||||
use std::io::Cursor;
|
||||
|
||||
let cursor = Cursor::new(bytes);
|
||||
let reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to parse Arrow IPC response: {}", e),
|
||||
})?;
|
||||
|
||||
// Collect all record batches
|
||||
let schema = reader.schema();
|
||||
let batches: Vec<_> = reader
|
||||
.into_iter()
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to read Arrow IPC batches: {}", e),
|
||||
})?;
|
||||
|
||||
// Create a stream from the batches
|
||||
let stream = futures::stream::iter(batches.into_iter().map(Ok));
|
||||
let record_batch_stream = Box::pin(
|
||||
datafusion_physical_plan::stream::RecordBatchStreamAdapter::new(schema, stream),
|
||||
);
|
||||
|
||||
Ok(DatasetRecordBatchStream::new(record_batch_stream))
|
||||
}
|
||||
|
||||
/// Check whether the table uses V2 manifest paths.
|
||||
///
|
||||
/// See [Self::migrate_manifest_paths_v2] and [ManifestNamingScheme] for
|
||||
@@ -2466,6 +2778,12 @@ impl BaseTable for NativeTable {
|
||||
query: &AnyQuery,
|
||||
options: QueryExecutionOptions,
|
||||
) -> Result<DatasetRecordBatchStream> {
|
||||
// If namespace client is configured, use server-side query execution
|
||||
if let Some(ref namespace_client) = self.namespace_client {
|
||||
return self
|
||||
.namespace_query(namespace_client.clone(), query, options)
|
||||
.await;
|
||||
}
|
||||
self.generic_query(query, options).await
|
||||
}
|
||||
|
||||
@@ -2934,7 +3252,7 @@ mod tests {
|
||||
|
||||
let batches = make_test_batches();
|
||||
let batches = Box::new(batches) as Box<dyn RecordBatchReader + Send>;
|
||||
let table = NativeTable::create(uri, "test", vec![], batches, None, None, None)
|
||||
let table = NativeTable::create(uri, "test", vec![], batches, None, None, None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -4574,4 +4892,91 @@ mod tests {
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_eq!(result[0].index_type, crate::index::IndexType::Bitmap);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_convert_to_namespace_query_vector() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let dataset_path = tmp_dir.path().join("test_ns_query.lance");
|
||||
|
||||
let batches = make_test_batches();
|
||||
Dataset::write(batches, dataset_path.to_str().unwrap(), None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let table = NativeTable::open(dataset_path.to_str().unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create a vector query
|
||||
let query_vector = Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0]));
|
||||
let vq = VectorQueryRequest {
|
||||
base: QueryRequest {
|
||||
limit: Some(10),
|
||||
offset: Some(5),
|
||||
filter: Some(QueryFilter::Sql("id > 0".to_string())),
|
||||
select: Select::Columns(vec!["id".to_string()]),
|
||||
..Default::default()
|
||||
},
|
||||
column: Some("vector".to_string()),
|
||||
query_vector: vec![query_vector as Arc<dyn Array>],
|
||||
minimum_nprobes: 20,
|
||||
distance_type: Some(crate::DistanceType::L2),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let any_query = AnyQuery::VectorQuery(vq);
|
||||
let ns_request = table.convert_to_namespace_query(&any_query).unwrap();
|
||||
|
||||
assert_eq!(ns_request.k, 10);
|
||||
assert_eq!(ns_request.offset, Some(5));
|
||||
assert_eq!(ns_request.filter, Some("id > 0".to_string()));
|
||||
assert_eq!(ns_request.columns, Some(vec!["id".to_string()]));
|
||||
assert_eq!(ns_request.vector_column, Some("vector".to_string()));
|
||||
assert_eq!(ns_request.distance_type, Some("l2".to_string()));
|
||||
assert!(ns_request.vector.single_vector.is_some());
|
||||
assert_eq!(
|
||||
ns_request.vector.single_vector.as_ref().unwrap(),
|
||||
&vec![1.0, 2.0, 3.0, 4.0]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_convert_to_namespace_query_plain_query() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let dataset_path = tmp_dir.path().join("test_ns_plain.lance");
|
||||
|
||||
let batches = make_test_batches();
|
||||
Dataset::write(batches, dataset_path.to_str().unwrap(), None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let table = NativeTable::open(dataset_path.to_str().unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create a plain (non-vector) query with filter and select
|
||||
let q = QueryRequest {
|
||||
limit: Some(20),
|
||||
offset: Some(5),
|
||||
filter: Some(QueryFilter::Sql("id > 5".to_string())),
|
||||
select: Select::Columns(vec!["id".to_string()]),
|
||||
with_row_id: true,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let any_query = AnyQuery::Query(q);
|
||||
let ns_request = table.convert_to_namespace_query(&any_query).unwrap();
|
||||
|
||||
// Plain queries should pass an empty vector
|
||||
assert_eq!(ns_request.k, 20);
|
||||
assert_eq!(ns_request.offset, Some(5));
|
||||
assert_eq!(ns_request.filter, Some("id > 5".to_string()));
|
||||
assert_eq!(ns_request.columns, Some(vec!["id".to_string()]));
|
||||
assert_eq!(ns_request.with_row_id, Some(true));
|
||||
assert_eq!(ns_request.bypass_vector_index, Some(true));
|
||||
assert!(ns_request.vector_column.is_none()); // No vector column for plain queries
|
||||
|
||||
// Should have an empty vector
|
||||
assert!(ns_request.vector.single_vector.as_ref().unwrap().is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user