From 9d638cb3c7cb190851d5aa84b75d8a6553d4b4fd Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 2 Dec 2025 21:04:12 -0800 Subject: [PATCH] feat: support namespace server side query (#2811) Currently a table in a namespace is still backed with a `NativeTable`, which means after getting the location of the table and optional storage options override from `namespace.describe_table`, all things work like a normal local table. However, namespace also supports `query_table`, which is exactly the same API as remote table. This PR adds a `server_side_query` capability, when enabled, it runs the query by calling `namespace.query_table`. For namespace that implements the operation (e.g. REST namespace), this could hit a backend server that could execute the query faster (e.g. using a distributed engine). --- rust/lancedb/src/connection.rs | 16 + rust/lancedb/src/database.rs | 23 +- rust/lancedb/src/database/listing.rs | 15 + rust/lancedb/src/database/namespace.rs | 26 +- rust/lancedb/src/remote/db.rs | 1 + rust/lancedb/src/table.rs | 411 ++++++++++++++++++++++++- 6 files changed, 486 insertions(+), 6 deletions(-) diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 663d498e..20320e9e 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -408,6 +408,7 @@ impl OpenTableBuilder { index_cache_size: None, lance_read_params: None, location: None, + namespace_client: None, }, embedding_registry, } @@ -1086,6 +1087,7 @@ pub struct ConnectNamespaceBuilder { read_consistency_interval: Option, embedding_registry: Option>, session: Option>, + server_side_query_enabled: bool, } impl ConnectNamespaceBuilder { @@ -1097,6 +1099,7 @@ impl ConnectNamespaceBuilder { read_consistency_interval: None, embedding_registry: None, session: None, + server_side_query_enabled: false, } } @@ -1151,6 +1154,18 @@ impl ConnectNamespaceBuilder { self } + /// Enable server-side query execution. + /// + /// When enabled, queries will be executed on the namespace server instead of + /// locally. This can improve performance by reducing data transfer and + /// leveraging server-side compute resources. + /// + /// Default is `false` (queries executed locally). + pub fn server_side_query(mut self, enabled: bool) -> Self { + self.server_side_query_enabled = enabled; + self + } + /// Execute the connection pub async fn execute(self) -> Result { use crate::database::namespace::LanceNamespaceDatabase; @@ -1162,6 +1177,7 @@ impl ConnectNamespaceBuilder { self.storage_options, self.read_consistency_interval, self.session, + self.server_side_query_enabled, ) .await?, ); diff --git a/rust/lancedb/src/database.rs b/rust/lancedb/src/database.rs index b7ce3613..5009b5fd 100644 --- a/rust/lancedb/src/database.rs +++ b/rust/lancedb/src/database.rs @@ -24,6 +24,7 @@ use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use futures::stream; use lance::dataset::ReadParams; use lance_datafusion::utils::StreamingWriteSource; +use lance_namespace::LanceNamespace; use crate::arrow::{SendableRecordBatchStream, SendableRecordBatchStreamExt}; use crate::error::Result; @@ -77,7 +78,7 @@ pub struct TableNamesRequest { } /// A request to open a table -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct OpenTableRequest { pub name: String, /// The namespace to open the table from. Empty list represents root namespace. @@ -87,6 +88,22 @@ pub struct OpenTableRequest { /// Optional custom location for the table. If not provided, the database will /// derive a location based on its URI and the table name. pub location: Option, + /// Optional namespace client for server-side query execution. + /// When set, queries will be executed on the namespace server instead of locally. + pub namespace_client: Option>, +} + +impl std::fmt::Debug for OpenTableRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OpenTableRequest") + .field("name", &self.name) + .field("namespace", &self.namespace) + .field("index_cache_size", &self.index_cache_size) + .field("lance_read_params", &self.lance_read_params) + .field("location", &self.location) + .field("namespace_client", &self.namespace_client) + .finish() + } } pub type TableBuilderCallback = Box OpenTableRequest + Send>; @@ -170,6 +187,9 @@ pub struct CreateTableRequest { /// Optional custom location for the table. If not provided, the database will /// derive a location based on its URI and the table name. pub location: Option, + /// Optional namespace client for server-side query execution. + /// When set, queries will be executed on the namespace server instead of locally. + pub namespace_client: Option>, } impl CreateTableRequest { @@ -181,6 +201,7 @@ impl CreateTableRequest { mode: CreateTableMode::default(), write_options: WriteOptions::default(), location: None, + namespace_client: None, } } } diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index a740cd30..471e9a44 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -641,6 +641,7 @@ impl ListingDatabase { index_cache_size: None, lance_read_params: None, location: None, + namespace_client: None, }; let req = (callback)(req); let table = self.open_table(req).await?; @@ -768,6 +769,7 @@ impl Database for ListingDatabase { self.store_wrapper.clone(), Some(write_params), self.read_consistency_interval, + request.namespace_client, ) .await { @@ -839,6 +841,7 @@ impl Database for ListingDatabase { self.store_wrapper.clone(), None, self.read_consistency_interval, + None, ) .await?; @@ -910,6 +913,7 @@ impl Database for ListingDatabase { self.store_wrapper.clone(), Some(read_params), self.read_consistency_interval, + request.namespace_client, ) .await?, ); @@ -1011,6 +1015,7 @@ mod tests { mode: CreateTableMode::Create, write_options: Default::default(), location: None, + namespace_client: None, }) .await .unwrap(); @@ -1075,6 +1080,7 @@ mod tests { mode: CreateTableMode::Create, write_options: Default::default(), location: None, + namespace_client: None, }) .await .unwrap(); @@ -1133,6 +1139,7 @@ mod tests { mode: CreateTableMode::Create, write_options: Default::default(), location: None, + namespace_client: None, }) .await .unwrap(); @@ -1168,6 +1175,7 @@ mod tests { mode: CreateTableMode::Create, write_options: Default::default(), location: None, + namespace_client: None, }) .await .unwrap(); @@ -1207,6 +1215,7 @@ mod tests { mode: CreateTableMode::Create, write_options: Default::default(), location: None, + namespace_client: None, }) .await .unwrap(); @@ -1246,6 +1255,7 @@ mod tests { mode: CreateTableMode::Create, write_options: Default::default(), location: None, + namespace_client: None, }) .await .unwrap(); @@ -1300,6 +1310,7 @@ mod tests { mode: CreateTableMode::Create, write_options: Default::default(), location: None, + namespace_client: None, }) .await .unwrap(); @@ -1357,6 +1368,7 @@ mod tests { mode: CreateTableMode::Create, write_options: Default::default(), location: None, + namespace_client: None, }) .await .unwrap(); @@ -1442,6 +1454,7 @@ mod tests { mode: CreateTableMode::Create, write_options: Default::default(), location: None, + namespace_client: None, }) .await .unwrap(); @@ -1528,6 +1541,7 @@ mod tests { mode: CreateTableMode::Create, write_options: Default::default(), location: None, + namespace_client: None, }) .await .unwrap(); @@ -1621,6 +1635,7 @@ mod tests { mode: CreateTableMode::Create, write_options: Default::default(), location: None, + namespace_client: None, }) .await .unwrap(); diff --git a/rust/lancedb/src/database/namespace.rs b/rust/lancedb/src/database/namespace.rs index 3e085622..8bec116d 100644 --- a/rust/lancedb/src/database/namespace.rs +++ b/rust/lancedb/src/database/namespace.rs @@ -40,6 +40,8 @@ pub struct LanceNamespaceDatabase { session: Option>, // database URI uri: String, + // Whether to enable server-side query execution + server_side_query_enabled: bool, } impl LanceNamespaceDatabase { @@ -49,6 +51,7 @@ impl LanceNamespaceDatabase { storage_options: HashMap, read_consistency_interval: Option, session: Option>, + server_side_query_enabled: bool, ) -> Result { let mut builder = ConnectBuilder::new(ns_impl); for (key, value) in ns_properties.clone() { @@ -67,6 +70,7 @@ impl LanceNamespaceDatabase { read_consistency_interval, session, uri: format!("namespace://{}", ns_impl), + server_side_query_enabled, }) } } @@ -76,6 +80,7 @@ impl std::fmt::Debug for LanceNamespaceDatabase { f.debug_struct("LanceNamespaceDatabase") .field("storage_options", &self.storage_options) .field("read_consistency_interval", &self.read_consistency_interval) + .field("server_side_query_enabled", &self.server_side_query_enabled) .finish() } } @@ -290,6 +295,10 @@ impl Database for LanceNamespaceDatabase { ) .await?; + let namespace_client = self + .server_side_query_enabled + .then(|| self.namespace.clone()); + return listing_db .open_table(OpenTableRequest { name: request.name.clone(), @@ -297,6 +306,7 @@ impl Database for LanceNamespaceDatabase { index_cache_size: None, lance_read_params: None, location: Some(location), + namespace_client, }) .await; } @@ -333,12 +343,16 @@ impl Database for LanceNamespaceDatabase { let listing_db = self .create_listing_database( &location, - table_id, + table_id.clone(), user_storage_options, create_empty_response.storage_options.as_ref(), ) .await?; + let namespace_client = self + .server_side_query_enabled + .then(|| self.namespace.clone()); + let create_request = DbCreateTableRequest { name: request.name, namespace: request.namespace, @@ -346,7 +360,9 @@ impl Database for LanceNamespaceDatabase { mode: request.mode, write_options: request.write_options, location: Some(location), + namespace_client, }; + listing_db.create_table(create_request).await } @@ -380,19 +396,25 @@ impl Database for LanceNamespaceDatabase { let listing_db = self .create_listing_database( &location, - table_id, + table_id.clone(), user_storage_options, response.storage_options.as_ref(), ) .await?; + let namespace_client = self + .server_side_query_enabled + .then(|| self.namespace.clone()); + let open_request = OpenTableRequest { name: request.name.clone(), namespace: request.namespace.clone(), index_cache_size: request.index_cache_size, lance_read_params: request.lance_read_params, location: Some(location), + namespace_client, }; + listing_db.open_table(open_request).await } diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index ccf79b8a..3b1fbb37 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -417,6 +417,7 @@ impl Database for RemoteDatabase { index_cache_size: None, lance_read_params: None, location: None, + namespace_client: None, }; let req = (callback)(req); self.open_table(req).await diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 79c1be35..cb0b34be 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -40,6 +40,11 @@ use lance_index::vector::pq::PQBuildParams; use lance_index::vector::sq::builder::SQBuildParams; use lance_index::DatasetIndexExt; use lance_index::IndexType; +use lance_namespace::models::{ + QueryTableRequest as NsQueryTableRequest, QueryTableRequestFullTextQuery, + QueryTableRequestVector, StringFtsQuery, +}; +use lance_namespace::LanceNamespace; use lance_table::format::Manifest; use lance_table::io::commit::ManifestNamingScheme; use log::info; @@ -1480,7 +1485,7 @@ impl NativeTableExt for Arc { } /// A table in a LanceDB database. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct NativeTable { name: String, namespace: Vec, @@ -1490,6 +1495,22 @@ pub struct NativeTable { // This comes from the connection options. We store here so we can pass down // to the dataset when we recreate it (for example, in checkout_latest). read_consistency_interval: Option, + // Optional namespace client for server-side query execution. + // When set, queries will be executed on the namespace server instead of locally. + namespace_client: Option>, +} + +impl std::fmt::Debug for NativeTable { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NativeTable") + .field("name", &self.name) + .field("namespace", &self.namespace) + .field("id", &self.id) + .field("uri", &self.uri) + .field("read_consistency_interval", &self.read_consistency_interval) + .field("namespace_client", &self.namespace_client) + .finish() + } } impl std::fmt::Display for NativeTable { @@ -1524,7 +1545,7 @@ impl NativeTable { /// * A [NativeTable] object. pub async fn open(uri: &str) -> Result { let name = Self::get_table_name(uri)?; - Self::open_with_params(uri, &name, vec![], None, None, None).await + Self::open_with_params(uri, &name, vec![], None, None, None, None).await } /// Opens an existing Table @@ -1534,10 +1555,12 @@ impl NativeTable { /// * `base_path` - The base path where the table is located /// * `name` The Table name /// * `params` The [ReadParams] to use when opening the table + /// * `namespace_client` - Optional namespace client for server-side query execution /// /// # Returns /// /// * A [NativeTable] object. + #[allow(clippy::too_many_arguments)] pub async fn open_with_params( uri: &str, name: &str, @@ -1545,6 +1568,7 @@ impl NativeTable { write_store_wrapper: Option>, params: Option, read_consistency_interval: Option, + namespace_client: Option>, ) -> Result { let params = params.unwrap_or_default(); // patch the params if we have a write store wrapper @@ -1575,9 +1599,18 @@ impl NativeTable { uri: uri.to_string(), dataset, read_consistency_interval, + namespace_client, }) } + /// Set the namespace client for server-side query execution. + /// + /// When set, queries will be executed on the namespace server instead of locally. + pub fn with_namespace_client(mut self, namespace_client: Arc) -> Self { + self.namespace_client = Some(namespace_client); + self + } + fn get_table_name(uri: &str) -> Result { let path = Path::new(uri); let name = path @@ -1614,10 +1647,12 @@ impl NativeTable { /// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided. /// * `batches` RecordBatch to be saved in the database. /// * `params` - Write parameters. + /// * `namespace_client` - Optional namespace client for server-side query execution /// /// # Returns /// /// * A [TableImpl] object. + #[allow(clippy::too_many_arguments)] pub async fn create( uri: &str, name: &str, @@ -1626,6 +1661,7 @@ impl NativeTable { write_store_wrapper: Option>, params: Option, read_consistency_interval: Option, + namespace_client: Option>, ) -> Result { // Default params uses format v1. let params = params.unwrap_or(WriteParams { @@ -1657,9 +1693,11 @@ impl NativeTable { uri: uri.to_string(), dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval), read_consistency_interval, + namespace_client, }) } + #[allow(clippy::too_many_arguments)] pub async fn create_empty( uri: &str, name: &str, @@ -1668,6 +1706,7 @@ impl NativeTable { write_store_wrapper: Option>, params: Option, read_consistency_interval: Option, + namespace_client: Option>, ) -> Result { let batches = RecordBatchIterator::new(vec![], schema); Self::create( @@ -1678,6 +1717,7 @@ impl NativeTable { write_store_wrapper, params, read_consistency_interval, + namespace_client, ) .await } @@ -2035,6 +2075,278 @@ impl NativeTable { Ok(DatasetRecordBatchStream::new(inner)) } + /// Execute a query on the namespace server instead of locally. + async fn namespace_query( + &self, + namespace_client: Arc, + query: &AnyQuery, + _options: QueryExecutionOptions, + ) -> Result { + // Build table_id from namespace + table name + let mut table_id = self.namespace.clone(); + table_id.push(self.name.clone()); + + // Convert AnyQuery to namespace QueryTableRequest + let mut ns_request = self.convert_to_namespace_query(query)?; + // Set the table ID on the request + ns_request.id = Some(table_id); + + // Call the namespace query_table API + let response_bytes = namespace_client + .query_table(ns_request) + .await + .map_err(|e| Error::Runtime { + message: format!("Failed to execute server-side query: {}", e), + })?; + + // Parse the Arrow IPC response into a RecordBatchStream + self.parse_arrow_ipc_response(response_bytes).await + } + + /// Convert a QueryFilter to a SQL string for the namespace API. + fn filter_to_sql(&self, filter: &QueryFilter) -> Result { + match filter { + QueryFilter::Sql(sql) => Ok(sql.clone()), + QueryFilter::Substrait(_) => Err(Error::NotSupported { + message: "Substrait filters are not supported for server-side queries".to_string(), + }), + QueryFilter::Datafusion(_) => Err(Error::NotSupported { + message: "Datafusion expression filters are not supported for server-side queries. Use SQL filter instead.".to_string(), + }), + } + } + + /// Convert an AnyQuery to the namespace QueryTableRequest format. + fn convert_to_namespace_query(&self, query: &AnyQuery) -> Result { + match query { + AnyQuery::VectorQuery(vq) => { + // Extract the query vector(s) + let vector = self.extract_query_vector(&vq.query_vector)?; + + // Convert filter to SQL string + let filter = match &vq.base.filter { + Some(f) => Some(self.filter_to_sql(f)?), + None => None, + }; + + // Convert select to columns list + let columns = match &vq.base.select { + Select::All => None, + Select::Columns(cols) => Some(cols.clone()), + Select::Dynamic(_) => { + return Err(Error::NotSupported { + message: + "Dynamic column selection is not supported for server-side queries" + .to_string(), + }); + } + }; + + // Check for unsupported features + if vq.base.reranker.is_some() { + return Err(Error::NotSupported { + message: "Reranker is not supported for server-side queries".to_string(), + }); + } + + // Convert FTS query if present + let full_text_query = vq.base.full_text_search.as_ref().map(|fts| { + let columns = fts.columns(); + let columns_vec = if columns.is_empty() { + None + } else { + Some(columns.into_iter().collect()) + }; + Box::new(QueryTableRequestFullTextQuery { + string_query: Some(Box::new(StringFtsQuery { + query: fts.query.to_string(), + columns: columns_vec, + })), + structured_query: None, + }) + }); + + Ok(NsQueryTableRequest { + id: None, // Will be set in namespace_query + k: vq.base.limit.unwrap_or(10) as i32, + vector: Box::new(vector), + vector_column: vq.column.clone(), + filter, + columns, + offset: vq.base.offset.map(|o| o as i32), + distance_type: vq.distance_type.map(|dt| dt.to_string()), + nprobes: Some(vq.minimum_nprobes as i32), + ef: vq.ef.map(|e| e as i32), + refine_factor: vq.refine_factor.map(|r| r as i32), + lower_bound: vq.lower_bound, + upper_bound: vq.upper_bound, + prefilter: Some(vq.base.prefilter), + fast_search: Some(vq.base.fast_search), + with_row_id: Some(vq.base.with_row_id), + bypass_vector_index: Some(!vq.use_index), + full_text_query, + version: None, + }) + } + AnyQuery::Query(q) => { + // For non-vector queries, pass an empty vector (similar to remote table implementation) + if q.reranker.is_some() { + return Err(Error::NotSupported { + message: "Reranker is not supported for server-side query execution" + .to_string(), + }); + } + + let filter = q + .filter + .as_ref() + .map(|f| self.filter_to_sql(f)) + .transpose()?; + + let columns = match &q.select { + Select::All => None, + Select::Columns(cols) => Some(cols.clone()), + Select::Dynamic(_) => { + return Err(Error::NotSupported { + message: "Dynamic columns are not supported for server-side query" + .to_string(), + }); + } + }; + + // Handle full text search if present + let full_text_query = q.full_text_search.as_ref().map(|fts| { + let columns_vec = if fts.columns().is_empty() { + None + } else { + Some(fts.columns().iter().cloned().collect()) + }; + Box::new(QueryTableRequestFullTextQuery { + string_query: Some(Box::new(StringFtsQuery { + query: fts.query.to_string(), + columns: columns_vec, + })), + structured_query: None, + }) + }); + + // Empty vector for non-vector queries + let vector = Box::new(QueryTableRequestVector { + single_vector: Some(vec![]), + multi_vector: None, + }); + + Ok(NsQueryTableRequest { + id: None, // Will be set by caller + vector, + k: q.limit.unwrap_or(10) as i32, + filter, + columns, + prefilter: Some(q.prefilter), + offset: q.offset.map(|o| o as i32), + ef: None, + refine_factor: None, + distance_type: None, + nprobes: None, + vector_column: None, // No vector column for plain queries + with_row_id: Some(q.with_row_id), + bypass_vector_index: Some(true), // No vector index for plain queries + full_text_query, + version: None, + fast_search: None, + lower_bound: None, + upper_bound: None, + }) + } + } + } + + /// Extract query vector(s) from Arrow arrays into the namespace format. + fn extract_query_vector( + &self, + query_vectors: &[Arc], + ) -> Result { + if query_vectors.is_empty() { + return Err(Error::InvalidInput { + message: "Query vector is required for vector search".to_string(), + }); + } + + // Handle single vector case + if query_vectors.len() == 1 { + let arr = &query_vectors[0]; + let single_vector = self.array_to_f32_vec(arr)?; + Ok(QueryTableRequestVector { + single_vector: Some(single_vector), + multi_vector: None, + }) + } else { + // Handle multi-vector case + let multi_vector: Result>> = query_vectors + .iter() + .map(|arr| self.array_to_f32_vec(arr)) + .collect(); + Ok(QueryTableRequestVector { + single_vector: None, + multi_vector: Some(multi_vector?), + }) + } + } + + /// Convert an Arrow array to a Vec. + fn array_to_f32_vec(&self, arr: &Arc) -> Result> { + // Handle FixedSizeList (common for vectors) + if let Some(fsl) = arr + .as_any() + .downcast_ref::() + { + let values = fsl.values(); + if let Some(f32_arr) = values.as_any().downcast_ref::() { + return Ok(f32_arr.values().to_vec()); + } + } + + // Handle direct Float32Array + if let Some(f32_arr) = arr.as_any().downcast_ref::() { + return Ok(f32_arr.values().to_vec()); + } + + Err(Error::InvalidInput { + message: "Query vector must be Float32 type".to_string(), + }) + } + + /// Parse Arrow IPC response from the namespace server. + async fn parse_arrow_ipc_response( + &self, + bytes: bytes::Bytes, + ) -> Result { + use arrow_ipc::reader::StreamReader; + use std::io::Cursor; + + let cursor = Cursor::new(bytes); + let reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Runtime { + message: format!("Failed to parse Arrow IPC response: {}", e), + })?; + + // Collect all record batches + let schema = reader.schema(); + let batches: Vec<_> = reader + .into_iter() + .collect::, _>>() + .map_err(|e| Error::Runtime { + message: format!("Failed to read Arrow IPC batches: {}", e), + })?; + + // Create a stream from the batches + let stream = futures::stream::iter(batches.into_iter().map(Ok)); + let record_batch_stream = Box::pin( + datafusion_physical_plan::stream::RecordBatchStreamAdapter::new(schema, stream), + ); + + Ok(DatasetRecordBatchStream::new(record_batch_stream)) + } + /// Check whether the table uses V2 manifest paths. /// /// See [Self::migrate_manifest_paths_v2] and [ManifestNamingScheme] for @@ -2466,6 +2778,12 @@ impl BaseTable for NativeTable { query: &AnyQuery, options: QueryExecutionOptions, ) -> Result { + // If namespace client is configured, use server-side query execution + if let Some(ref namespace_client) = self.namespace_client { + return self + .namespace_query(namespace_client.clone(), query, options) + .await; + } self.generic_query(query, options).await } @@ -2934,7 +3252,7 @@ mod tests { let batches = make_test_batches(); let batches = Box::new(batches) as Box; - let table = NativeTable::create(uri, "test", vec![], batches, None, None, None) + let table = NativeTable::create(uri, "test", vec![], batches, None, None, None, None) .await .unwrap(); @@ -4574,4 +4892,91 @@ mod tests { assert_eq!(result.len(), 1); assert_eq!(result[0].index_type, crate::index::IndexType::Bitmap); } + + #[tokio::test] + async fn test_convert_to_namespace_query_vector() { + let tmp_dir = tempdir().unwrap(); + let dataset_path = tmp_dir.path().join("test_ns_query.lance"); + + let batches = make_test_batches(); + Dataset::write(batches, dataset_path.to_str().unwrap(), None) + .await + .unwrap(); + + let table = NativeTable::open(dataset_path.to_str().unwrap()) + .await + .unwrap(); + + // Create a vector query + let query_vector = Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0])); + let vq = VectorQueryRequest { + base: QueryRequest { + limit: Some(10), + offset: Some(5), + filter: Some(QueryFilter::Sql("id > 0".to_string())), + select: Select::Columns(vec!["id".to_string()]), + ..Default::default() + }, + column: Some("vector".to_string()), + query_vector: vec![query_vector as Arc], + minimum_nprobes: 20, + distance_type: Some(crate::DistanceType::L2), + ..Default::default() + }; + + let any_query = AnyQuery::VectorQuery(vq); + let ns_request = table.convert_to_namespace_query(&any_query).unwrap(); + + assert_eq!(ns_request.k, 10); + assert_eq!(ns_request.offset, Some(5)); + assert_eq!(ns_request.filter, Some("id > 0".to_string())); + assert_eq!(ns_request.columns, Some(vec!["id".to_string()])); + assert_eq!(ns_request.vector_column, Some("vector".to_string())); + assert_eq!(ns_request.distance_type, Some("l2".to_string())); + assert!(ns_request.vector.single_vector.is_some()); + assert_eq!( + ns_request.vector.single_vector.as_ref().unwrap(), + &vec![1.0, 2.0, 3.0, 4.0] + ); + } + + #[tokio::test] + async fn test_convert_to_namespace_query_plain_query() { + let tmp_dir = tempdir().unwrap(); + let dataset_path = tmp_dir.path().join("test_ns_plain.lance"); + + let batches = make_test_batches(); + Dataset::write(batches, dataset_path.to_str().unwrap(), None) + .await + .unwrap(); + + let table = NativeTable::open(dataset_path.to_str().unwrap()) + .await + .unwrap(); + + // Create a plain (non-vector) query with filter and select + let q = QueryRequest { + limit: Some(20), + offset: Some(5), + filter: Some(QueryFilter::Sql("id > 5".to_string())), + select: Select::Columns(vec!["id".to_string()]), + with_row_id: true, + ..Default::default() + }; + + let any_query = AnyQuery::Query(q); + let ns_request = table.convert_to_namespace_query(&any_query).unwrap(); + + // Plain queries should pass an empty vector + assert_eq!(ns_request.k, 20); + assert_eq!(ns_request.offset, Some(5)); + assert_eq!(ns_request.filter, Some("id > 5".to_string())); + assert_eq!(ns_request.columns, Some(vec!["id".to_string()])); + assert_eq!(ns_request.with_row_id, Some(true)); + assert_eq!(ns_request.bypass_vector_index, Some(true)); + assert!(ns_request.vector_column.is_none()); // No vector column for plain queries + + // Should have an empty vector + assert!(ns_request.vector.single_vector.as_ref().unwrap().is_empty()); + } }