From 3e25f584eb1fb8020d3311bc824f903cd195f4f9 Mon Sep 17 00:00:00 2001 From: Yang Cen Date: Mon, 8 Jun 2026 19:48:40 +0800 Subject: [PATCH] fix(python): push down namespace full reads (#3516) ## Bug Fix ### What is the bug? Namespace-backed `LanceTable.to_arrow()` full-table reads bypassed the existing `QueryTable` server-side query path and called the lower-level table `to_arrow()` implementation directly. In Geneva/Sophon this could fail while parsing the Arrow IPC response for `hist.get_table().to_arrow()` / `to_pandas()`, even though `hist.get_table().search().to_arrow()` worked. ### What issues or incorrect behavior does the bug cause? Full-table reads on namespace-backed tables with `QueryTable` pushdown could fail with Arrow IPC parse errors, while query/search reads on the same table succeeded. Since `to_pandas()` delegates through `to_arrow()` for non-blob/native cases, pandas export was affected too. ### How does this PR fix the problem? When `QueryTable` pushdown is enabled, sync and async table `to_arrow()` now construct a plain no-filter, no-limit, all-columns query and execute it through the table-level `_execute_query()` path. `AsyncTable` now preserves namespace context from async namespace connections so async full reads can make the same pushdown decision. Non-namespace tables and namespace tables without `QueryTable` pushdown keep their existing behavior. ### Tests - `uv run --extra tests --extra dev --no-sync ruff check python/lancedb/table.py python/lancedb/namespace.py python/tests/test_namespace.py` - `uv run --extra tests --extra dev --no-sync ruff format python/lancedb/table.py python/lancedb/namespace.py python/tests/test_namespace.py` - `uv run --extra tests --extra dev --no-sync pytest python/tests/test_namespace.py::TestPushdownOperations::test_lance_table_to_arrow_uses_query_pushdown python/tests/test_namespace.py::TestAsyncPushdownOperations::test_async_table_to_arrow_uses_query_pushdown python/tests/test_namespace.py::test_local_table_to_arrow_and_to_pandas_are_unchanged -q` - `uv run --extra tests --extra dev --no-sync pytest python/tests/test_namespace.py -q` --- python/python/lancedb/namespace.py | 21 +++++- python/python/lancedb/table.py | 53 ++++++++++++- python/python/tests/test_namespace.py | 105 ++++++++++++++++++++++++++ 3 files changed, 172 insertions(+), 7 deletions(-) diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index 14f6dd25a..8784bc19b 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -144,7 +144,12 @@ def _query_to_namespace_request( if query.postfilter is not None: prefilter = not query.postfilter - k = query.limit if query.limit is not None else 10 + if query.limit is not None: + k = query.limit + elif query.vector is None and query.full_text_query is None: + k = sys.maxsize + else: + k = 10 # Build request kwargs, only including non-None values for optional fields # that Pydantic doesn't accept as None @@ -954,7 +959,7 @@ class AsyncLanceNamespaceDBConnection: if mode.lower() not in ["create", "overwrite"]: raise ValueError("mode must be either 'create' or 'overwrite'") validate_table_name(name) - return await self._inner.create_table( + table = await self._inner.create_table( name, data, schema=schema, @@ -966,6 +971,11 @@ class AsyncLanceNamespaceDBConnection: embedding_functions=embedding_functions, storage_options=storage_options, ) + return table._set_namespace_context( + namespace_path=namespace_path, + namespace_client=self._namespace_client, + pushdown_operations=self._namespace_client_pushdown_operations, + ) async def open_table( self, @@ -979,7 +989,7 @@ class AsyncLanceNamespaceDBConnection: if namespace_path is None: namespace_path = [] try: - return await self._inner.open_table( + table = await self._inner.open_table( name, namespace_path=namespace_path, storage_options=storage_options, @@ -990,6 +1000,11 @@ class AsyncLanceNamespaceDBConnection: table_id = namespace_path + [name] raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}") raise + return table._set_namespace_context( + namespace_path=namespace_path, + namespace_client=self._namespace_client, + pushdown_operations=self._namespace_client_pushdown_operations, + ) async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None): """Drop a table from the namespace.""" diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index e6b48fdce..e57774ca2 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -92,6 +92,12 @@ BlobMode = Literal["lazy", "bytes", "descriptions"] _VALID_BLOB_MODES = ("lazy", "bytes", "descriptions") +def _should_push_down_query_table( + namespace_client: Optional[Any], pushdown_operations: set +) -> bool: + return namespace_client is not None and "QueryTable" in pushdown_operations + + def _validate_blob_mode(blob_mode: BlobMode) -> None: if blob_mode not in _VALID_BLOB_MODES: modes = ", ".join(repr(mode) for mode in _VALID_BLOB_MODES) @@ -2333,6 +2339,11 @@ class LanceTable(Table): Returns ------- pa.Table""" + if _should_push_down_query_table( + self._namespace_client, self._pushdown_operations + ): + return self._execute_query(Query()).read_all() + return LOOP.run(self._table.to_arrow()) def to_polars(self, batch_size=None) -> "pl.LazyFrame": @@ -3446,9 +3457,8 @@ class LanceTable(Table): batch_size: Optional[int] = None, timeout: Optional[timedelta] = None, ) -> pa.RecordBatchReader: - if ( - "QueryTable" in self._pushdown_operations - and self._namespace_client is not None + if _should_push_down_query_table( + self._namespace_client, self._pushdown_operations ): from lancedb.namespace import _execute_server_side_query @@ -4182,7 +4192,14 @@ class AsyncTable: [AsyncTable.create_index][lancedb.table.AsyncTable.create_index]. """ - def __init__(self, table: LanceDBTable): + def __init__( + self, + table: LanceDBTable, + *, + namespace_path: Optional[List[str]] = None, + namespace_client: Optional[Any] = None, + pushdown_operations: Optional[set] = None, + ): """Create a new AsyncTable object. You should not create AsyncTable objects directly. @@ -4191,6 +4208,21 @@ class AsyncTable: [AsyncConnection.open_table][lancedb.AsyncConnection.open_table] to obtain Table objects.""" self._inner = table + self._namespace_path = namespace_path or [] + self._namespace_client = namespace_client + self._pushdown_operations = pushdown_operations or set() + + def _set_namespace_context( + self, + *, + namespace_path: Optional[List[str]] = None, + namespace_client: Optional[Any] = None, + pushdown_operations: Optional[set] = None, + ) -> "AsyncTable": + self._namespace_path = namespace_path or [] + self._namespace_client = namespace_client + self._pushdown_operations = pushdown_operations or set() + return self def __repr__(self): return self._inner.__repr__() @@ -4391,6 +4423,11 @@ class AsyncTable: ------- pa.Table """ + if _should_push_down_query_table( + self._namespace_client, self._pushdown_operations + ): + return (await self._execute_query(Query())).read_all() + return await self.query().to_arrow() async def create_index( @@ -5068,6 +5105,14 @@ class AsyncTable: batch_size: Optional[int] = None, timeout: Optional[timedelta] = None, ) -> pa.RecordBatchReader: + if _should_push_down_query_table( + self._namespace_client, self._pushdown_operations + ): + from lancedb.namespace import _execute_server_side_query + + table_id = self._namespace_path + [self.name] + return _execute_server_side_query(self._namespace_client, table_id, query) + # The sync table calls into this method, so we need to map the # query to the async version of the query and run that here. This is only # used for that code path right now. diff --git a/python/python/tests/test_namespace.py b/python/python/tests/test_namespace.py index f9c923ad2..8417e7384 100644 --- a/python/python/tests/test_namespace.py +++ b/python/python/tests/test_namespace.py @@ -5,10 +5,63 @@ import tempfile import shutil +import sys import pytest import pyarrow as pa import lancedb from lance_namespace.errors import NamespaceNotEmptyError, TableNotFoundError +from lancedb.table import AsyncTable, LanceTable + + +PUSHDOWN_DATA = pa.table( + {"id": list(range(12)), "text": [f"row-{idx}" for idx in range(12)]} +) + + +def _ipc_file(table: pa.Table = PUSHDOWN_DATA) -> bytes: + sink = pa.BufferOutputStream() + with pa.ipc.new_file(sink, table.schema) as writer: + writer.write_table(table) + return sink.getvalue().to_pybytes() + + +class _FailingSyncInner: + name = "hist" + + async def schema(self): + return PUSHDOWN_DATA.schema + + async def to_arrow(self): + raise RuntimeError("direct table to_arrow should not be used") + + +class _FailingAsyncInner: + def name(self): + return "hist" + + async def schema(self): + return PUSHDOWN_DATA.schema + + def query(self): + raise AssertionError("direct async query should not be used") + + +class _NamespaceClient: + def __init__(self): + self.requests = [] + + def query_table(self, request): + self.requests.append(request) + return _ipc_file() + + +def _namespace_lance_table(namespace_client: _NamespaceClient) -> LanceTable: + table = LanceTable.__new__(LanceTable) + table._table = _FailingSyncInner() + table._namespace_path = ["geneva"] + table._namespace_client = namespace_client + table._pushdown_operations = {"QueryTable"} + return table class TestNamespaceConnection: @@ -736,6 +789,22 @@ class TestPushdownOperations: db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) assert len(db._namespace_client_pushdown_operations) == 0 + def test_lance_table_to_arrow_uses_query_pushdown(self): + namespace_client = _NamespaceClient() + table = _namespace_lance_table(namespace_client) + + assert table.to_arrow().equals(PUSHDOWN_DATA) + assert table.to_pandas()["id"].tolist() == list(range(12)) + assert len(namespace_client.requests) == 2 + assert [request.id for request in namespace_client.requests] == [ + ["geneva", "hist"], + ["geneva", "hist"], + ] + assert [request.k for request in namespace_client.requests] == [ + sys.maxsize, + sys.maxsize, + ] + @pytest.mark.asyncio class TestAsyncPushdownOperations: @@ -771,3 +840,39 @@ class TestAsyncPushdownOperations: """Test that pushdown operations default to empty on async connection.""" db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir}) assert len(db._namespace_client_pushdown_operations) == 0 + + async def test_async_table_to_arrow_uses_query_pushdown(self): + namespace_client = _NamespaceClient() + + table = AsyncTable( + _FailingAsyncInner(), + namespace_path=["geneva"], + namespace_client=namespace_client, + pushdown_operations={"QueryTable"}, + ) + + assert (await table.to_arrow()).equals(PUSHDOWN_DATA) + assert (await table.to_pandas())["id"].tolist() == list(range(12)) + assert len(namespace_client.requests) == 2 + assert [request.id for request in namespace_client.requests] == [ + ["geneva", "hist"], + ["geneva", "hist"], + ] + assert [request.k for request in namespace_client.requests] == [ + sys.maxsize, + sys.maxsize, + ] + + +def test_local_table_to_arrow_and_to_pandas_are_unchanged(tmp_path): + db = lancedb.connect(str(tmp_path / "db")) + table = db.create_table( + "local", + data=[ + {"id": 1, "vector": [1.0, 2.0]}, + {"id": 2, "vector": [3.0, 4.0]}, + ], + ) + + assert table.to_arrow().column("id").to_pylist() == [1, 2] + assert table.to_pandas()["id"].tolist() == [1, 2]