mirror of
https://github.com/lancedb/lancedb.git
synced 2026-07-03 02:50:41 +00:00
fix(python): push down namespace full reads (#3516)
## Bug Fix ### What is the bug? Namespace-backed `LanceTable.to_arrow()` full-table reads bypassed the existing `QueryTable` server-side query path and called the lower-level table `to_arrow()` implementation directly. In Geneva/Sophon this could fail while parsing the Arrow IPC response for `hist.get_table().to_arrow()` / `to_pandas()`, even though `hist.get_table().search().to_arrow()` worked. ### What issues or incorrect behavior does the bug cause? Full-table reads on namespace-backed tables with `QueryTable` pushdown could fail with Arrow IPC parse errors, while query/search reads on the same table succeeded. Since `to_pandas()` delegates through `to_arrow()` for non-blob/native cases, pandas export was affected too. ### How does this PR fix the problem? When `QueryTable` pushdown is enabled, sync and async table `to_arrow()` now construct a plain no-filter, no-limit, all-columns query and execute it through the table-level `_execute_query()` path. `AsyncTable` now preserves namespace context from async namespace connections so async full reads can make the same pushdown decision. Non-namespace tables and namespace tables without `QueryTable` pushdown keep their existing behavior. ### Tests - `uv run --extra tests --extra dev --no-sync ruff check python/lancedb/table.py python/lancedb/namespace.py python/tests/test_namespace.py` - `uv run --extra tests --extra dev --no-sync ruff format python/lancedb/table.py python/lancedb/namespace.py python/tests/test_namespace.py` - `uv run --extra tests --extra dev --no-sync pytest python/tests/test_namespace.py::TestPushdownOperations::test_lance_table_to_arrow_uses_query_pushdown python/tests/test_namespace.py::TestAsyncPushdownOperations::test_async_table_to_arrow_uses_query_pushdown python/tests/test_namespace.py::test_local_table_to_arrow_and_to_pandas_are_unchanged -q` - `uv run --extra tests --extra dev --no-sync pytest python/tests/test_namespace.py -q`
This commit is contained in:
@@ -144,7 +144,12 @@ def _query_to_namespace_request(
|
||||
if query.postfilter is not None:
|
||||
prefilter = not query.postfilter
|
||||
|
||||
k = query.limit if query.limit is not None else 10
|
||||
if query.limit is not None:
|
||||
k = query.limit
|
||||
elif query.vector is None and query.full_text_query is None:
|
||||
k = sys.maxsize
|
||||
else:
|
||||
k = 10
|
||||
|
||||
# Build request kwargs, only including non-None values for optional fields
|
||||
# that Pydantic doesn't accept as None
|
||||
@@ -954,7 +959,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
if mode.lower() not in ["create", "overwrite"]:
|
||||
raise ValueError("mode must be either 'create' or 'overwrite'")
|
||||
validate_table_name(name)
|
||||
return await self._inner.create_table(
|
||||
table = await self._inner.create_table(
|
||||
name,
|
||||
data,
|
||||
schema=schema,
|
||||
@@ -966,6 +971,11 @@ class AsyncLanceNamespaceDBConnection:
|
||||
embedding_functions=embedding_functions,
|
||||
storage_options=storage_options,
|
||||
)
|
||||
return table._set_namespace_context(
|
||||
namespace_path=namespace_path,
|
||||
namespace_client=self._namespace_client,
|
||||
pushdown_operations=self._namespace_client_pushdown_operations,
|
||||
)
|
||||
|
||||
async def open_table(
|
||||
self,
|
||||
@@ -979,7 +989,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
try:
|
||||
return await self._inner.open_table(
|
||||
table = await self._inner.open_table(
|
||||
name,
|
||||
namespace_path=namespace_path,
|
||||
storage_options=storage_options,
|
||||
@@ -990,6 +1000,11 @@ class AsyncLanceNamespaceDBConnection:
|
||||
table_id = namespace_path + [name]
|
||||
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
|
||||
raise
|
||||
return table._set_namespace_context(
|
||||
namespace_path=namespace_path,
|
||||
namespace_client=self._namespace_client,
|
||||
pushdown_operations=self._namespace_client_pushdown_operations,
|
||||
)
|
||||
|
||||
async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
|
||||
"""Drop a table from the namespace."""
|
||||
|
||||
@@ -92,6 +92,12 @@ BlobMode = Literal["lazy", "bytes", "descriptions"]
|
||||
_VALID_BLOB_MODES = ("lazy", "bytes", "descriptions")
|
||||
|
||||
|
||||
def _should_push_down_query_table(
|
||||
namespace_client: Optional[Any], pushdown_operations: set
|
||||
) -> bool:
|
||||
return namespace_client is not None and "QueryTable" in pushdown_operations
|
||||
|
||||
|
||||
def _validate_blob_mode(blob_mode: BlobMode) -> None:
|
||||
if blob_mode not in _VALID_BLOB_MODES:
|
||||
modes = ", ".join(repr(mode) for mode in _VALID_BLOB_MODES)
|
||||
@@ -2333,6 +2339,11 @@ class LanceTable(Table):
|
||||
Returns
|
||||
-------
|
||||
pa.Table"""
|
||||
if _should_push_down_query_table(
|
||||
self._namespace_client, self._pushdown_operations
|
||||
):
|
||||
return self._execute_query(Query()).read_all()
|
||||
|
||||
return LOOP.run(self._table.to_arrow())
|
||||
|
||||
def to_polars(self, batch_size=None) -> "pl.LazyFrame":
|
||||
@@ -3446,9 +3457,8 @@ class LanceTable(Table):
|
||||
batch_size: Optional[int] = None,
|
||||
timeout: Optional[timedelta] = None,
|
||||
) -> pa.RecordBatchReader:
|
||||
if (
|
||||
"QueryTable" in self._pushdown_operations
|
||||
and self._namespace_client is not None
|
||||
if _should_push_down_query_table(
|
||||
self._namespace_client, self._pushdown_operations
|
||||
):
|
||||
from lancedb.namespace import _execute_server_side_query
|
||||
|
||||
@@ -4182,7 +4192,14 @@ class AsyncTable:
|
||||
[AsyncTable.create_index][lancedb.table.AsyncTable.create_index].
|
||||
"""
|
||||
|
||||
def __init__(self, table: LanceDBTable):
|
||||
def __init__(
|
||||
self,
|
||||
table: LanceDBTable,
|
||||
*,
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
namespace_client: Optional[Any] = None,
|
||||
pushdown_operations: Optional[set] = None,
|
||||
):
|
||||
"""Create a new AsyncTable object.
|
||||
|
||||
You should not create AsyncTable objects directly.
|
||||
@@ -4191,6 +4208,21 @@ class AsyncTable:
|
||||
[AsyncConnection.open_table][lancedb.AsyncConnection.open_table] to obtain
|
||||
Table objects."""
|
||||
self._inner = table
|
||||
self._namespace_path = namespace_path or []
|
||||
self._namespace_client = namespace_client
|
||||
self._pushdown_operations = pushdown_operations or set()
|
||||
|
||||
def _set_namespace_context(
|
||||
self,
|
||||
*,
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
namespace_client: Optional[Any] = None,
|
||||
pushdown_operations: Optional[set] = None,
|
||||
) -> "AsyncTable":
|
||||
self._namespace_path = namespace_path or []
|
||||
self._namespace_client = namespace_client
|
||||
self._pushdown_operations = pushdown_operations or set()
|
||||
return self
|
||||
|
||||
def __repr__(self):
|
||||
return self._inner.__repr__()
|
||||
@@ -4391,6 +4423,11 @@ class AsyncTable:
|
||||
-------
|
||||
pa.Table
|
||||
"""
|
||||
if _should_push_down_query_table(
|
||||
self._namespace_client, self._pushdown_operations
|
||||
):
|
||||
return (await self._execute_query(Query())).read_all()
|
||||
|
||||
return await self.query().to_arrow()
|
||||
|
||||
async def create_index(
|
||||
@@ -5068,6 +5105,14 @@ class AsyncTable:
|
||||
batch_size: Optional[int] = None,
|
||||
timeout: Optional[timedelta] = None,
|
||||
) -> pa.RecordBatchReader:
|
||||
if _should_push_down_query_table(
|
||||
self._namespace_client, self._pushdown_operations
|
||||
):
|
||||
from lancedb.namespace import _execute_server_side_query
|
||||
|
||||
table_id = self._namespace_path + [self.name]
|
||||
return _execute_server_side_query(self._namespace_client, table_id, query)
|
||||
|
||||
# The sync table calls into this method, so we need to map the
|
||||
# query to the async version of the query and run that here. This is only
|
||||
# used for that code path right now.
|
||||
|
||||
@@ -5,10 +5,63 @@
|
||||
|
||||
import tempfile
|
||||
import shutil
|
||||
import sys
|
||||
import pytest
|
||||
import pyarrow as pa
|
||||
import lancedb
|
||||
from lance_namespace.errors import NamespaceNotEmptyError, TableNotFoundError
|
||||
from lancedb.table import AsyncTable, LanceTable
|
||||
|
||||
|
||||
PUSHDOWN_DATA = pa.table(
|
||||
{"id": list(range(12)), "text": [f"row-{idx}" for idx in range(12)]}
|
||||
)
|
||||
|
||||
|
||||
def _ipc_file(table: pa.Table = PUSHDOWN_DATA) -> bytes:
|
||||
sink = pa.BufferOutputStream()
|
||||
with pa.ipc.new_file(sink, table.schema) as writer:
|
||||
writer.write_table(table)
|
||||
return sink.getvalue().to_pybytes()
|
||||
|
||||
|
||||
class _FailingSyncInner:
|
||||
name = "hist"
|
||||
|
||||
async def schema(self):
|
||||
return PUSHDOWN_DATA.schema
|
||||
|
||||
async def to_arrow(self):
|
||||
raise RuntimeError("direct table to_arrow should not be used")
|
||||
|
||||
|
||||
class _FailingAsyncInner:
|
||||
def name(self):
|
||||
return "hist"
|
||||
|
||||
async def schema(self):
|
||||
return PUSHDOWN_DATA.schema
|
||||
|
||||
def query(self):
|
||||
raise AssertionError("direct async query should not be used")
|
||||
|
||||
|
||||
class _NamespaceClient:
|
||||
def __init__(self):
|
||||
self.requests = []
|
||||
|
||||
def query_table(self, request):
|
||||
self.requests.append(request)
|
||||
return _ipc_file()
|
||||
|
||||
|
||||
def _namespace_lance_table(namespace_client: _NamespaceClient) -> LanceTable:
|
||||
table = LanceTable.__new__(LanceTable)
|
||||
table._table = _FailingSyncInner()
|
||||
table._namespace_path = ["geneva"]
|
||||
table._namespace_client = namespace_client
|
||||
table._pushdown_operations = {"QueryTable"}
|
||||
return table
|
||||
|
||||
|
||||
class TestNamespaceConnection:
|
||||
@@ -736,6 +789,22 @@ class TestPushdownOperations:
|
||||
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||
assert len(db._namespace_client_pushdown_operations) == 0
|
||||
|
||||
def test_lance_table_to_arrow_uses_query_pushdown(self):
|
||||
namespace_client = _NamespaceClient()
|
||||
table = _namespace_lance_table(namespace_client)
|
||||
|
||||
assert table.to_arrow().equals(PUSHDOWN_DATA)
|
||||
assert table.to_pandas()["id"].tolist() == list(range(12))
|
||||
assert len(namespace_client.requests) == 2
|
||||
assert [request.id for request in namespace_client.requests] == [
|
||||
["geneva", "hist"],
|
||||
["geneva", "hist"],
|
||||
]
|
||||
assert [request.k for request in namespace_client.requests] == [
|
||||
sys.maxsize,
|
||||
sys.maxsize,
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestAsyncPushdownOperations:
|
||||
@@ -771,3 +840,39 @@ class TestAsyncPushdownOperations:
|
||||
"""Test that pushdown operations default to empty on async connection."""
|
||||
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
|
||||
assert len(db._namespace_client_pushdown_operations) == 0
|
||||
|
||||
async def test_async_table_to_arrow_uses_query_pushdown(self):
|
||||
namespace_client = _NamespaceClient()
|
||||
|
||||
table = AsyncTable(
|
||||
_FailingAsyncInner(),
|
||||
namespace_path=["geneva"],
|
||||
namespace_client=namespace_client,
|
||||
pushdown_operations={"QueryTable"},
|
||||
)
|
||||
|
||||
assert (await table.to_arrow()).equals(PUSHDOWN_DATA)
|
||||
assert (await table.to_pandas())["id"].tolist() == list(range(12))
|
||||
assert len(namespace_client.requests) == 2
|
||||
assert [request.id for request in namespace_client.requests] == [
|
||||
["geneva", "hist"],
|
||||
["geneva", "hist"],
|
||||
]
|
||||
assert [request.k for request in namespace_client.requests] == [
|
||||
sys.maxsize,
|
||||
sys.maxsize,
|
||||
]
|
||||
|
||||
|
||||
def test_local_table_to_arrow_and_to_pandas_are_unchanged(tmp_path):
|
||||
db = lancedb.connect(str(tmp_path / "db"))
|
||||
table = db.create_table(
|
||||
"local",
|
||||
data=[
|
||||
{"id": 1, "vector": [1.0, 2.0]},
|
||||
{"id": 2, "vector": [3.0, 4.0]},
|
||||
],
|
||||
)
|
||||
|
||||
assert table.to_arrow().column("id").to_pylist() == [1, 2]
|
||||
assert table.to_pandas()["id"].tolist() == [1, 2]
|
||||
|
||||
Reference in New Issue
Block a user