diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index 8825c1d61..23d4f7885 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -373,6 +373,19 @@ def _convert_pyarrow_schema_to_json(schema: pa.Schema) -> JsonArrowSchema: return JsonArrowSchema(fields=fields, metadata=meta) +def _builds_namespace_natively( + namespace_client_impl: Optional[str], + namespace_client_properties: Optional[Dict[str, str]], +) -> bool: + """Whether ``connect_namespace_client`` builds the namespace client natively + in Rust (installing the read-freshness context provider) rather than wrapping + the pre-built Python client. + + Must mirror Rust ``build_namespace_natively`` in ``python/src/connection.rs``. + """ + return namespace_client_impl == "rest" and bool(namespace_client_properties) + + class LanceNamespaceDBConnection(DBConnection): """ A LanceDB connection that uses a namespace for table management. @@ -432,6 +445,13 @@ class LanceNamespaceDBConnection(DBConnection): ) self._namespace_client_impl = namespace_client_impl self._namespace_client_properties = namespace_client_properties + # When the namespace client is built natively (see Rust + # ``build_namespace_natively``), the underlying Rust table performs + # QueryTable pushdown through the read-freshness context provider, which + # the pure-Python ``query_table`` path bypasses. + self._route_pushdown_to_rust = _builds_namespace_natively( + namespace_client_impl, namespace_client_properties + ) self._inner = AsyncConnection( _connect_namespace_client( namespace_client, @@ -543,6 +563,7 @@ class LanceNamespaceDBConnection(DBConnection): namespace_path=namespace_path, namespace_client=self._namespace_client, pushdown_operations=self._namespace_client_pushdown_operations, + route_pushdown_to_rust=self._route_pushdown_to_rust, _async=async_table, ) @@ -580,6 +601,7 @@ class LanceNamespaceDBConnection(DBConnection): namespace_path=namespace_path, namespace_client=self._namespace_client, pushdown_operations=self._namespace_client_pushdown_operations, + route_pushdown_to_rust=self._route_pushdown_to_rust, _async=async_table, ) if branch is not None: @@ -875,6 +897,8 @@ class AsyncLanceNamespaceDBConnection: storage_options: Optional[Dict[str, str]] = None, session: Optional[Session] = None, namespace_client_pushdown_operations: Optional[List[str]] = None, + namespace_client_impl: Optional[str] = None, + namespace_client_properties: Optional[Dict[str, str]] = None, ): """ Initialize an async namespace-based LanceDB connection. @@ -900,6 +924,12 @@ class AsyncLanceNamespaceDBConnection: namespace.create_table() instead of using declare_table + local write. Default is None (no pushdown, all operations run locally). + namespace_client_impl : Optional[str] + The namespace implementation name used to create this connection. + Required (with ``namespace_client_properties``) for the Rust client to + be built natively and install the read-freshness provider. + namespace_client_properties : Optional[Dict[str, str]] + The namespace properties used to create this connection. """ self._namespace_client = namespace_client self.read_consistency_interval = read_consistency_interval @@ -908,6 +938,14 @@ class AsyncLanceNamespaceDBConnection: self._namespace_client_pushdown_operations = set( namespace_client_pushdown_operations or [] ) + self._namespace_client_impl = namespace_client_impl + self._namespace_client_properties = namespace_client_properties + # See LanceNamespaceDBConnection: when built natively the Rust table runs + # QueryTable pushdown through the read-freshness provider, so defer to it + # rather than the urllib3 client (which omits x-lancedb-min-timestamp). + self._route_pushdown_to_rust = _builds_namespace_natively( + namespace_client_impl, namespace_client_properties + ) self._inner = AsyncConnection( _connect_namespace_client( namespace_client, @@ -921,8 +959,8 @@ class AsyncLanceNamespaceDBConnection: namespace_client_pushdown_operations=( list(self._namespace_client_pushdown_operations) ), - namespace_client_impl=None, - namespace_client_properties=None, + namespace_client_impl=namespace_client_impl, + namespace_client_properties=namespace_client_properties, ) ) @@ -992,6 +1030,7 @@ class AsyncLanceNamespaceDBConnection: namespace_path=namespace_path, namespace_client=self._namespace_client, pushdown_operations=self._namespace_client_pushdown_operations, + route_pushdown_to_rust=self._route_pushdown_to_rust, ) async def open_table( @@ -1029,6 +1068,7 @@ class AsyncLanceNamespaceDBConnection: namespace_path=namespace_path, namespace_client=self._namespace_client, pushdown_operations=self._namespace_client_pushdown_operations, + route_pushdown_to_rust=self._route_pushdown_to_rust, ) async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None): @@ -1387,4 +1427,6 @@ def connect_namespace_async( storage_options=storage_options, session=session, namespace_client_pushdown_operations=namespace_client_pushdown_operations, + namespace_client_impl=namespace_client_impl, + namespace_client_properties=namespace_client_properties, ) diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 3d4cefcb6..25df6b554 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -2022,6 +2022,7 @@ class LanceTable(Table): namespace_client: Optional[Any] = None, managed_versioning: Optional[bool] = None, pushdown_operations: Optional[set] = None, + route_pushdown_to_rust: bool = False, _async: AsyncTable = None, ): if namespace_path is None: @@ -2031,6 +2032,14 @@ class LanceTable(Table): self._location = location # Store location for use in _dataset_path self._namespace_client = namespace_client self._pushdown_operations = pushdown_operations or set() + # When the connection built the namespace client natively (e.g. an + # enterprise "rest" connection), the underlying Rust table already + # executes QueryTable pushdown itself -- and, unlike this Python urllib3 + # path, it routes through the read-freshness context provider that emits + # the ``x-lancedb-min-timestamp`` header. So we must defer pushdown to + # Rust instead of calling the Python ``namespace_client.query_table`` + # directly, or reads silently bypass read-freshness (stale results). + self._route_pushdown_to_rust = route_pushdown_to_rust if _async is not None: self._table = _async else: @@ -2241,6 +2250,7 @@ class LanceTable(Table): namespace_path=self._namespace_path, namespace_client=self._namespace_client, pushdown_operations=self._pushdown_operations, + route_pushdown_to_rust=self._route_pushdown_to_rust, location=self._location, _async=async_table, ) @@ -2391,8 +2401,11 @@ class LanceTable(Table): Returns ------- pa.Table""" - if _should_push_down_query_table( - self._namespace_client, self._pushdown_operations + if ( + _should_push_down_query_table( + self._namespace_client, self._pushdown_operations + ) + and not self._route_pushdown_to_rust ): return self._execute_query(Query()).read_all() @@ -3344,6 +3357,7 @@ class LanceTable(Table): location: Optional[str] = None, namespace_client: Optional[Any] = None, pushdown_operations: Optional[set] = None, + route_pushdown_to_rust: bool = False, ): """ Create a new table. @@ -3406,6 +3420,7 @@ class LanceTable(Table): self._location = location self._namespace_client = namespace_client self._pushdown_operations = pushdown_operations or set() + self._route_pushdown_to_rust = route_pushdown_to_rust if data_storage_version is not None: warnings.warn( @@ -3519,6 +3534,7 @@ class LanceTable(Table): _should_push_down_query_table( self._namespace_client, self._pushdown_operations ) + and not self._route_pushdown_to_rust and self.current_branch() is None ): from lancedb.namespace import _execute_server_side_query @@ -4260,6 +4276,7 @@ class AsyncTable: namespace_path: Optional[List[str]] = None, namespace_client: Optional[Any] = None, pushdown_operations: Optional[set] = None, + route_pushdown_to_rust: bool = False, ): """Create a new AsyncTable object. @@ -4272,6 +4289,9 @@ class AsyncTable: self._namespace_path = namespace_path or [] self._namespace_client = namespace_client self._pushdown_operations = pushdown_operations or set() + # See LanceTable.__init__: defer QueryTable pushdown to Rust (which emits + # the read-freshness header) for natively-built namespace clients. + self._route_pushdown_to_rust = route_pushdown_to_rust def _set_namespace_context( self, @@ -4279,10 +4299,12 @@ class AsyncTable: namespace_path: Optional[List[str]] = None, namespace_client: Optional[Any] = None, pushdown_operations: Optional[set] = None, + route_pushdown_to_rust: bool = False, ) -> "AsyncTable": self._namespace_path = namespace_path or [] self._namespace_client = namespace_client self._pushdown_operations = pushdown_operations or set() + self._route_pushdown_to_rust = route_pushdown_to_rust return self def __repr__(self): @@ -4492,8 +4514,11 @@ class AsyncTable: ------- pa.Table """ - if _should_push_down_query_table( - self._namespace_client, self._pushdown_operations + if ( + _should_push_down_query_table( + self._namespace_client, self._pushdown_operations + ) + and not self._route_pushdown_to_rust ): return (await self._execute_query(Query())).read_all() @@ -5177,8 +5202,11 @@ class AsyncTable: batch_size: Optional[int] = None, timeout: Optional[timedelta] = None, ) -> pa.RecordBatchReader: - if _should_push_down_query_table( - self._namespace_client, self._pushdown_operations + if ( + _should_push_down_query_table( + self._namespace_client, self._pushdown_operations + ) + and not self._route_pushdown_to_rust ): from lancedb.namespace import _execute_server_side_query diff --git a/python/python/tests/test_namespace.py b/python/python/tests/test_namespace.py index e944df7bd..7f20bf459 100644 --- a/python/python/tests/test_namespace.py +++ b/python/python/tests/test_namespace.py @@ -65,6 +65,9 @@ def _namespace_lance_table(namespace_client: _NamespaceClient) -> LanceTable: table._namespace_path = ["geneva"] table._namespace_client = namespace_client table._pushdown_operations = {"QueryTable"} + # This test exercises the Python-side pushdown path (non-native client), so + # pushdown is not routed to Rust. + table._route_pushdown_to_rust = False return table @@ -805,6 +808,37 @@ class TestPushdownOperations: db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) assert len(db._namespace_client_pushdown_operations) == 0 + def test_route_pushdown_to_rust_for_native_rest(self): + """A natively-built rest connection must defer QueryTable pushdown to + Rust so reads carry the x-lancedb-min-timestamp read-freshness header.""" + db = lancedb.connect_namespace( + "rest", + {"uri": "http://localhost:12345"}, + namespace_client_pushdown_operations=["QueryTable"], + ) + assert db._route_pushdown_to_rust is True + + def test_route_pushdown_to_rust_false_for_dir(self): + """A non-native (dir) connection keeps the Python pushdown path.""" + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) + assert db._route_pushdown_to_rust is False + + def test_async_route_pushdown_to_rust_for_native_rest(self): + """The async connection must not silently bypass the read-freshness fix: + a natively-built rest connection defers pushdown to Rust (regression test + for the async path omitting the freshness header).""" + db = lancedb.connect_namespace_async( + "rest", + {"uri": "http://localhost:12345"}, + namespace_client_pushdown_operations=["QueryTable"], + ) + assert db._route_pushdown_to_rust is True + + def test_async_route_pushdown_to_rust_false_for_dir(self): + """The async non-native (dir) connection keeps the Python pushdown path.""" + db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir}) + assert db._route_pushdown_to_rust is False + def test_lance_table_to_arrow_uses_query_pushdown(self): namespace_client = _NamespaceClient() table = _namespace_lance_table(namespace_client) diff --git a/python/src/connection.rs b/python/src/connection.rs index 51713fc93..007480326 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -610,24 +610,38 @@ pub fn connect_namespace_client( namespace_client_impl: Option, namespace_client_properties: Option>, ) -> PyResult { - let namespace_client = extract_namespace_arc(py, namespace_client)?; let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64); let namespace_client_pushdown_operations = parse_namespace_client_pushdown_operations(namespace_client_pushdown_operations)?; - let ns_impl = namespace_client_impl.unwrap_or_else(|| "python".to_string()); let ns_properties = namespace_client_properties.unwrap_or_default(); let storage_options = storage_options.unwrap_or_default(); let session = session.map(|s| s.inner.clone()); - let database = LanceNamespaceDatabase::from_namespace_client( - namespace_client, - ns_impl, - ns_properties, - storage_options, - read_consistency_interval, - session, - namespace_client_pushdown_operations, - ); + // Prefer building the namespace natively from (impl, properties) so the + // read-freshness provider installed + let database = if build_namespace_natively(namespace_client_impl.as_deref(), &ns_properties) { + let ns_impl = namespace_client_impl.expect("impl present per build_namespace_natively"); + crate::runtime::block_on(LanceNamespaceDatabase::connect( + &ns_impl, + ns_properties, + storage_options, + read_consistency_interval, + session, + namespace_client_pushdown_operations, + )) + .infer_error()? + } else { + let namespace_client = extract_namespace_arc(py, namespace_client)?; + LanceNamespaceDatabase::from_namespace_client( + namespace_client, + namespace_client_impl.unwrap_or_else(|| "python".to_string()), + ns_properties, + storage_options, + read_consistency_interval, + session, + namespace_client_pushdown_operations, + ) + }; Ok(Connection::new(LanceConnection::new( Arc::new(database), @@ -635,6 +649,16 @@ pub fn connect_namespace_client( ))) } +/// Whether to build the namespace natively (from impl + properties) instead of +/// wrapping a pre-built client. Native construction is required for the +/// read-freshness provider to be installed +fn build_namespace_natively( + namespace_client_impl: Option<&str>, + namespace_client_properties: &HashMap, +) -> bool { + matches!(namespace_client_impl, Some("rest")) && !namespace_client_properties.is_empty() +} + #[derive(FromPyObject)] pub struct PyClientConfig { user_agent: String, @@ -733,3 +757,36 @@ impl From for lancedb::remote::ClientConfig { } } } + +#[cfg(test)] +mod tests { + use super::*; + + fn props(pairs: &[(&str, &str)]) -> HashMap { + pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + #[test] + fn native_build_only_for_rest_with_properties() { + let rest = props(&[("uri", "http://localhost:10024")]); + + // rest + non-empty properties -> build natively (installs the + // read-freshness provider so checkout_latest() busts the server cache). + assert!(build_namespace_natively(Some("rest"), &rest)); + + // dir is local (no server cache) -> wrap the pre-built client unchanged. + assert!(!build_namespace_natively( + Some("dir"), + &props(&[("root", "/tmp")]) + )); + + // No impl: only a pre-built client was handed in -> wrap it as-is. + assert!(!build_namespace_natively(None, &rest)); + + // rest but no properties: nothing to build a connection from -> wrap. + assert!(!build_namespace_natively(Some("rest"), &HashMap::new())); + } +} diff --git a/python/src/runtime.rs b/python/src/runtime.rs index 39ebfdaa8..13951a7aa 100644 --- a/python/src/runtime.rs +++ b/python/src/runtime.rs @@ -56,6 +56,15 @@ fn get_runtime() -> &'static runtime::Runtime { unsafe { &*new_ptr } } +/// Block the current thread on a future using the shared runtime. +/// +/// For sync `#[pyfunction]`s that need to drive an async operation (e.g. +/// building a namespace client). Must not be called from within the runtime's +/// own worker threads. +pub fn block_on(fut: F) -> F::Output { + get_runtime().block_on(fut) +} + /// Runs in async-signal context after `fork()` in the child. We can only /// touch atomics here; we deliberately leak the previous runtime because /// dropping a tokio `Runtime` would try to join its (now-dead) worker diff --git a/rust/lancedb/src/table/query.rs b/rust/lancedb/src/table/query.rs index 04961f101..e38fec598 100644 --- a/rust/lancedb/src/table/query.rs +++ b/rust/lancedb/src/table/query.rs @@ -579,24 +579,45 @@ fn array_to_f32_vec(arr: &Arc) -> Result> { }) } +/// Magic bytes that prefix (and suffix) the Arrow IPC *file* format. +const ARROW_IPC_FILE_MAGIC: &[u8] = b"ARROW1"; + /// Parse Arrow IPC response from the namespace server. +/// +/// The server may return either the Arrow IPC *file* format or the *stream* +/// format. REST/phalanx returns the file format (it begins with the `ARROW1` +/// magic); reading that with a `StreamReader` fails with "failed to fill whole +/// buffer". Detect the magic and pick the matching reader so both are handled. async fn parse_arrow_ipc_response(bytes: bytes::Bytes) -> Result { - use arrow_ipc::reader::StreamReader; + use arrow_ipc::reader::{FileReader, StreamReader}; use std::io::Cursor; - let cursor = Cursor::new(bytes); - let reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Runtime { - message: format!("Failed to parse Arrow IPC response: {}", e), - })?; - - // Collect all record batches - let schema = reader.schema(); - let batches: Vec<_> = reader - .into_iter() - .collect::, _>>() - .map_err(|e| Error::Runtime { - message: format!("Failed to read Arrow IPC batches: {}", e), + let (schema, batches) = if bytes.starts_with(ARROW_IPC_FILE_MAGIC) { + let reader = FileReader::try_new(Cursor::new(bytes), None).map_err(|e| Error::Runtime { + message: format!("Failed to parse Arrow IPC file response: {}", e), })?; + let schema = reader.schema(); + let batches = reader + .into_iter() + .collect::, _>>() + .map_err(|e| Error::Runtime { + message: format!("Failed to read Arrow IPC file batches: {}", e), + })?; + (schema, batches) + } else { + let reader = + StreamReader::try_new(Cursor::new(bytes), None).map_err(|e| Error::Runtime { + message: format!("Failed to parse Arrow IPC response: {}", e), + })?; + let schema = reader.schema(); + let batches = reader + .into_iter() + .collect::, _>>() + .map_err(|e| Error::Runtime { + message: format!("Failed to read Arrow IPC batches: {}", e), + })?; + (schema, batches) + }; // Create a stream from the batches let stream = futures::stream::iter(batches.into_iter().map(Ok)); @@ -624,6 +645,59 @@ mod tests { FixedSizeListArray::try_new_from_values(Float32Array::from(values), dimension).unwrap() } + #[tokio::test] + async fn test_parse_arrow_ipc_response_handles_file_and_stream() { + use arrow_array::{Int32Array, RecordBatch}; + use arrow_ipc::writer::{FileWriter, StreamWriter}; + use arrow_schema::{DataType, Field, Schema}; + + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef], + ) + .unwrap(); + + // Arrow IPC *file* format -- what REST/phalanx returns. Previously this + // failed with "failed to fill whole buffer" because we used a StreamReader. + let mut file_buf = Vec::new(); + { + let mut writer = FileWriter::try_new(&mut file_buf, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + assert!(file_buf.starts_with(ARROW_IPC_FILE_MAGIC)); + let rows: usize = parse_arrow_ipc_response(bytes::Bytes::from(file_buf)) + .await + .unwrap() + .try_collect::>() + .await + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum(); + assert_eq!(rows, 3); + + // Arrow IPC *stream* format must still parse. + let mut stream_buf = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut stream_buf, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + assert!(!stream_buf.starts_with(ARROW_IPC_FILE_MAGIC)); + let rows: usize = parse_arrow_ipc_response(bytes::Bytes::from(stream_buf)) + .await + .unwrap() + .try_collect::>() + .await + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum(); + assert_eq!(rows, 3); + } + #[test] fn test_convert_to_namespace_query_vector() { let query_vector = Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0]));