diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index 8825c1d61..8d6fa61a6 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -373,6 +373,19 @@ def _convert_pyarrow_schema_to_json(schema: pa.Schema) -> JsonArrowSchema: return JsonArrowSchema(fields=fields, metadata=meta) +def _builds_namespace_natively( + namespace_client_impl: Optional[str], + namespace_client_properties: Optional[Dict[str, str]], +) -> bool: + """Whether ``connect_namespace_client`` builds the namespace client natively + in Rust (installing the read-freshness context provider) rather than wrapping + the pre-built Python client. + + Must mirror Rust ``build_namespace_natively`` in ``python/src/connection.rs``. + """ + return namespace_client_impl == "rest" and bool(namespace_client_properties) + + class LanceNamespaceDBConnection(DBConnection): """ A LanceDB connection that uses a namespace for table management. @@ -432,6 +445,13 @@ class LanceNamespaceDBConnection(DBConnection): ) self._namespace_client_impl = namespace_client_impl self._namespace_client_properties = namespace_client_properties + # When the namespace client is built natively (see Rust + # ``build_namespace_natively``), the underlying Rust table performs + # QueryTable pushdown through the read-freshness context provider, which + # the pure-Python ``query_table`` path bypasses. + self._route_pushdown_to_rust = _builds_namespace_natively( + namespace_client_impl, namespace_client_properties + ) self._inner = AsyncConnection( _connect_namespace_client( namespace_client, @@ -543,6 +563,7 @@ class LanceNamespaceDBConnection(DBConnection): namespace_path=namespace_path, namespace_client=self._namespace_client, pushdown_operations=self._namespace_client_pushdown_operations, + route_pushdown_to_rust=self._route_pushdown_to_rust, _async=async_table, ) @@ -580,6 +601,7 @@ class LanceNamespaceDBConnection(DBConnection): namespace_path=namespace_path, namespace_client=self._namespace_client, pushdown_operations=self._namespace_client_pushdown_operations, + route_pushdown_to_rust=self._route_pushdown_to_rust, _async=async_table, ) if branch is not None: diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 3d4cefcb6..b29d9dbd5 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -2022,6 +2022,7 @@ class LanceTable(Table): namespace_client: Optional[Any] = None, managed_versioning: Optional[bool] = None, pushdown_operations: Optional[set] = None, + route_pushdown_to_rust: bool = False, _async: AsyncTable = None, ): if namespace_path is None: @@ -2031,6 +2032,14 @@ class LanceTable(Table): self._location = location # Store location for use in _dataset_path self._namespace_client = namespace_client self._pushdown_operations = pushdown_operations or set() + # When the connection built the namespace client natively (e.g. an + # enterprise "rest" connection), the underlying Rust table already + # executes QueryTable pushdown itself -- and, unlike this Python urllib3 + # path, it routes through the read-freshness context provider that emits + # the ``x-lancedb-min-timestamp`` header. So we must defer pushdown to + # Rust instead of calling the Python ``namespace_client.query_table`` + # directly, or reads silently bypass read-freshness (stale results). + self._route_pushdown_to_rust = route_pushdown_to_rust if _async is not None: self._table = _async else: @@ -2241,6 +2250,7 @@ class LanceTable(Table): namespace_path=self._namespace_path, namespace_client=self._namespace_client, pushdown_operations=self._pushdown_operations, + route_pushdown_to_rust=self._route_pushdown_to_rust, location=self._location, _async=async_table, ) @@ -2391,8 +2401,11 @@ class LanceTable(Table): Returns ------- pa.Table""" - if _should_push_down_query_table( - self._namespace_client, self._pushdown_operations + if ( + _should_push_down_query_table( + self._namespace_client, self._pushdown_operations + ) + and not self._route_pushdown_to_rust ): return self._execute_query(Query()).read_all() @@ -3519,6 +3532,7 @@ class LanceTable(Table): _should_push_down_query_table( self._namespace_client, self._pushdown_operations ) + and not self._route_pushdown_to_rust and self.current_branch() is None ): from lancedb.namespace import _execute_server_side_query @@ -4260,6 +4274,7 @@ class AsyncTable: namespace_path: Optional[List[str]] = None, namespace_client: Optional[Any] = None, pushdown_operations: Optional[set] = None, + route_pushdown_to_rust: bool = False, ): """Create a new AsyncTable object. @@ -4272,6 +4287,9 @@ class AsyncTable: self._namespace_path = namespace_path or [] self._namespace_client = namespace_client self._pushdown_operations = pushdown_operations or set() + # See LanceTable.__init__: defer QueryTable pushdown to Rust (which emits + # the read-freshness header) for natively-built namespace clients. + self._route_pushdown_to_rust = route_pushdown_to_rust def _set_namespace_context( self, @@ -4279,10 +4297,12 @@ class AsyncTable: namespace_path: Optional[List[str]] = None, namespace_client: Optional[Any] = None, pushdown_operations: Optional[set] = None, + route_pushdown_to_rust: bool = False, ) -> "AsyncTable": self._namespace_path = namespace_path or [] self._namespace_client = namespace_client self._pushdown_operations = pushdown_operations or set() + self._route_pushdown_to_rust = route_pushdown_to_rust return self def __repr__(self): @@ -4492,8 +4512,11 @@ class AsyncTable: ------- pa.Table """ - if _should_push_down_query_table( - self._namespace_client, self._pushdown_operations + if ( + _should_push_down_query_table( + self._namespace_client, self._pushdown_operations + ) + and not self._route_pushdown_to_rust ): return (await self._execute_query(Query())).read_all() @@ -5177,8 +5200,11 @@ class AsyncTable: batch_size: Optional[int] = None, timeout: Optional[timedelta] = None, ) -> pa.RecordBatchReader: - if _should_push_down_query_table( - self._namespace_client, self._pushdown_operations + if ( + _should_push_down_query_table( + self._namespace_client, self._pushdown_operations + ) + and not self._route_pushdown_to_rust ): from lancedb.namespace import _execute_server_side_query diff --git a/python/python/tests/test_namespace.py b/python/python/tests/test_namespace.py index e944df7bd..b3e8fd200 100644 --- a/python/python/tests/test_namespace.py +++ b/python/python/tests/test_namespace.py @@ -65,6 +65,9 @@ def _namespace_lance_table(namespace_client: _NamespaceClient) -> LanceTable: table._namespace_path = ["geneva"] table._namespace_client = namespace_client table._pushdown_operations = {"QueryTable"} + # This test exercises the Python-side pushdown path (non-native client), so + # pushdown is not routed to Rust. + table._route_pushdown_to_rust = False return table diff --git a/rust/lancedb/src/table/query.rs b/rust/lancedb/src/table/query.rs index 04961f101..e38fec598 100644 --- a/rust/lancedb/src/table/query.rs +++ b/rust/lancedb/src/table/query.rs @@ -579,24 +579,45 @@ fn array_to_f32_vec(arr: &Arc) -> Result> { }) } +/// Magic bytes that prefix (and suffix) the Arrow IPC *file* format. +const ARROW_IPC_FILE_MAGIC: &[u8] = b"ARROW1"; + /// Parse Arrow IPC response from the namespace server. +/// +/// The server may return either the Arrow IPC *file* format or the *stream* +/// format. REST/phalanx returns the file format (it begins with the `ARROW1` +/// magic); reading that with a `StreamReader` fails with "failed to fill whole +/// buffer". Detect the magic and pick the matching reader so both are handled. async fn parse_arrow_ipc_response(bytes: bytes::Bytes) -> Result { - use arrow_ipc::reader::StreamReader; + use arrow_ipc::reader::{FileReader, StreamReader}; use std::io::Cursor; - let cursor = Cursor::new(bytes); - let reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Runtime { - message: format!("Failed to parse Arrow IPC response: {}", e), - })?; - - // Collect all record batches - let schema = reader.schema(); - let batches: Vec<_> = reader - .into_iter() - .collect::, _>>() - .map_err(|e| Error::Runtime { - message: format!("Failed to read Arrow IPC batches: {}", e), + let (schema, batches) = if bytes.starts_with(ARROW_IPC_FILE_MAGIC) { + let reader = FileReader::try_new(Cursor::new(bytes), None).map_err(|e| Error::Runtime { + message: format!("Failed to parse Arrow IPC file response: {}", e), })?; + let schema = reader.schema(); + let batches = reader + .into_iter() + .collect::, _>>() + .map_err(|e| Error::Runtime { + message: format!("Failed to read Arrow IPC file batches: {}", e), + })?; + (schema, batches) + } else { + let reader = + StreamReader::try_new(Cursor::new(bytes), None).map_err(|e| Error::Runtime { + message: format!("Failed to parse Arrow IPC response: {}", e), + })?; + let schema = reader.schema(); + let batches = reader + .into_iter() + .collect::, _>>() + .map_err(|e| Error::Runtime { + message: format!("Failed to read Arrow IPC batches: {}", e), + })?; + (schema, batches) + }; // Create a stream from the batches let stream = futures::stream::iter(batches.into_iter().map(Ok)); @@ -624,6 +645,59 @@ mod tests { FixedSizeListArray::try_new_from_values(Float32Array::from(values), dimension).unwrap() } + #[tokio::test] + async fn test_parse_arrow_ipc_response_handles_file_and_stream() { + use arrow_array::{Int32Array, RecordBatch}; + use arrow_ipc::writer::{FileWriter, StreamWriter}; + use arrow_schema::{DataType, Field, Schema}; + + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef], + ) + .unwrap(); + + // Arrow IPC *file* format -- what REST/phalanx returns. Previously this + // failed with "failed to fill whole buffer" because we used a StreamReader. + let mut file_buf = Vec::new(); + { + let mut writer = FileWriter::try_new(&mut file_buf, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + assert!(file_buf.starts_with(ARROW_IPC_FILE_MAGIC)); + let rows: usize = parse_arrow_ipc_response(bytes::Bytes::from(file_buf)) + .await + .unwrap() + .try_collect::>() + .await + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum(); + assert_eq!(rows, 3); + + // Arrow IPC *stream* format must still parse. + let mut stream_buf = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut stream_buf, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + assert!(!stream_buf.starts_with(ARROW_IPC_FILE_MAGIC)); + let rows: usize = parse_arrow_ipc_response(bytes::Bytes::from(stream_buf)) + .await + .unwrap() + .try_collect::>() + .await + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum(); + assert_eq!(rows, 3); + } + #[test] fn test_convert_to_namespace_query_vector() { let query_vector = Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0]));