mirror of
https://github.com/lancedb/lancedb.git
synced 2026-07-03 11:00:40 +00:00
feat(python): support blob modes in query to_pandas (#3487)
## Feature - What is the new feature? - Adds `blob_mode` support to sync and async Python query `to_pandas()` APIs. - Enables plain scan queries to return blob columns as lazy `BlobFile` objects, raw bytes, or blob descriptions. - Lets namespace-backed local tables use Lance native blob-aware pandas conversion for lazy blobs. - Why do we need this feature? - Table and Lance dataset/scanner APIs already support blob-aware pandas conversion, but LanceDB query builders did not expose that capability. - Geneva and other callers should be able to use query-level `to_pandas(blob_mode=...)` without manually constructing Lance scanners. - How does it work? - Plain scan queries route through Lance scanner native `to_pandas(blob_mode=...)`, preserving filter, projection, limit, offset, row id, and alias/expression projection behavior. - Non-native query shapes keep existing Arrow fallback semantics and raise a clear error when they return blob columns with `blob_mode="lazy"` or `blob_mode="bytes"`. - Focused tests cover table/query blob modes, filter/select/limit/offset/alias query cases, async query behavior, vector-query error boundaries, and namespace-backed lazy blobs. ## Validation - `cd python && .venv/bin/maturin develop --uv --extras tests,dev --profile dev` - `cd python && uv run --frozen --no-sync pytest python/tests/test_table.py::test_table_to_pandas_blob_modes python/tests/test_table.py::test_async_table_to_pandas_blob_bytes python/tests/test_query.py::test_plain_scan_query_to_pandas_blob_modes python/tests/test_query.py::test_plain_scan_query_to_pandas_blob_projection python/tests/test_query.py::test_async_plain_scan_query_to_pandas_blob_projection python/tests/test_query.py::test_vector_query_to_pandas_blob_mode_requires_native_path python/tests/test_namespace.py::TestNamespaceConnection::test_table_to_pandas_blob_lazy_through_namespace -q` - `cd python && uv run --frozen --no-sync ruff format --check .` - `cd python && uv run --frozen --no-sync ruff check .` - `git diff --check`
This commit is contained in:
@@ -39,6 +39,35 @@ from utils import exception_output
|
||||
from importlib.util import find_spec
|
||||
|
||||
|
||||
def _blob_query_data():
|
||||
return pa.table(
|
||||
{
|
||||
"id": pa.array([1, 2, 3, 4], pa.int64()),
|
||||
"tag": pa.array(["drop", "keep", "keep", "keep"], pa.utf8()),
|
||||
"vector": pa.array(
|
||||
[[1.0, 0.0], [2.0, 0.0], [3.0, 0.0], [4.0, 0.0]],
|
||||
type=pa.list_(pa.float32(), list_size=2),
|
||||
),
|
||||
"blob": pa.array([b"one", b"two", b"three", b"four"], pa.large_binary()),
|
||||
},
|
||||
schema=pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field("tag", pa.utf8()),
|
||||
pa.field("vector", pa.list_(pa.float32(), list_size=2)),
|
||||
pa.field(
|
||||
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _assert_lazy_blob(value, expected: bytes):
|
||||
assert hasattr(value, "readall")
|
||||
assert value.readall() == expected
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def table(tmpdir_factory) -> lancedb.table.Table:
|
||||
tmp_path = str(tmpdir_factory.mktemp("data"))
|
||||
@@ -181,6 +210,138 @@ async def test_query_to_pandas_kwargs(table, table_async):
|
||||
assert async_df["id"].tolist() == [1, 2]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("blob_mode", ["lazy", "bytes", "descriptions"])
|
||||
def test_plain_scan_query_to_pandas_blob_modes(tmp_db, blob_mode):
|
||||
pytest.importorskip("lance")
|
||||
table = tmp_db.create_table(
|
||||
f"test_query_to_pandas_blob_{blob_mode}", _blob_query_data()
|
||||
)
|
||||
|
||||
df = (
|
||||
table.search()
|
||||
.select(["id", "blob"])
|
||||
.where("id = 1")
|
||||
.to_pandas(blob_mode=blob_mode)
|
||||
)
|
||||
|
||||
assert df["id"].tolist() == [1]
|
||||
if blob_mode == "lazy":
|
||||
_assert_lazy_blob(df["blob"].iloc[0], b"one")
|
||||
elif blob_mode == "bytes":
|
||||
assert df["blob"].tolist() == [b"one"]
|
||||
else:
|
||||
first = df["blob"].iloc[0]
|
||||
assert first != b"one"
|
||||
assert not hasattr(first, "readall")
|
||||
|
||||
|
||||
def test_plain_scan_query_to_pandas_blob_projection(tmp_db):
|
||||
pytest.importorskip("lance")
|
||||
table = tmp_db.create_table(
|
||||
"test_query_to_pandas_blob_projection", _blob_query_data()
|
||||
)
|
||||
|
||||
df = (
|
||||
table.search()
|
||||
.where("id >= 2")
|
||||
.select({"id_alias": "id", "payload": "blob", "double_id": "id * 2"})
|
||||
.limit(2)
|
||||
.offset(1)
|
||||
.to_pandas(blob_mode="bytes")
|
||||
)
|
||||
|
||||
assert df["id_alias"].tolist() == [3, 4]
|
||||
assert df["payload"].tolist() == [b"three", b"four"]
|
||||
assert df["double_id"].tolist() == [6, 8]
|
||||
|
||||
|
||||
def test_plain_scan_query_to_pandas_blob_mode_does_not_collect_arrow(
|
||||
tmp_db, monkeypatch
|
||||
):
|
||||
pytest.importorskip("lance")
|
||||
table = tmp_db.create_table(
|
||||
"test_query_to_pandas_blob_no_arrow_collect", _blob_query_data()
|
||||
)
|
||||
query = table.search().where("id = 1").select(["id", "blob"])
|
||||
|
||||
def fail_to_arrow(*args, **kwargs):
|
||||
raise AssertionError("to_arrow should not be called before native pandas")
|
||||
|
||||
monkeypatch.setattr(query, "to_arrow", fail_to_arrow)
|
||||
|
||||
df = query.to_pandas(blob_mode="bytes")
|
||||
|
||||
assert df["id"].tolist() == [1]
|
||||
assert df["blob"].tolist() == [b"one"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_plain_scan_query_to_pandas_blob_projection(tmp_db_async):
|
||||
pytest.importorskip("lance")
|
||||
table = await tmp_db_async.create_table(
|
||||
"test_async_query_to_pandas_blob_projection", _blob_query_data()
|
||||
)
|
||||
|
||||
lazy_df = await (
|
||||
table.query().where("id = 1").select(["id", "blob"]).to_pandas(blob_mode="lazy")
|
||||
)
|
||||
assert lazy_df["id"].tolist() == [1]
|
||||
_assert_lazy_blob(lazy_df["blob"].iloc[0], b"one")
|
||||
|
||||
bytes_df = await (
|
||||
table.query()
|
||||
.where("id >= 2")
|
||||
.select({"id_alias": "id", "payload": "blob", "double_id": "id * 2"})
|
||||
.limit(2)
|
||||
.offset(1)
|
||||
.to_pandas(blob_mode="bytes")
|
||||
)
|
||||
assert bytes_df["id_alias"].tolist() == [3, 4]
|
||||
assert bytes_df["payload"].tolist() == [b"three", b"four"]
|
||||
assert bytes_df["double_id"].tolist() == [6, 8]
|
||||
|
||||
desc_df = await (
|
||||
table.query()
|
||||
.where("id = 1")
|
||||
.select(["blob"])
|
||||
.to_pandas(blob_mode="descriptions")
|
||||
)
|
||||
first = desc_df["blob"].iloc[0]
|
||||
assert first != b"one"
|
||||
assert not hasattr(first, "readall")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_plain_scan_query_to_pandas_blob_mode_does_not_collect_arrow(
|
||||
tmp_db_async, monkeypatch
|
||||
):
|
||||
pytest.importorskip("lance")
|
||||
table = await tmp_db_async.create_table(
|
||||
"test_async_query_to_pandas_blob_no_arrow_collect", _blob_query_data()
|
||||
)
|
||||
query = table.query().where("id = 1").select(["id", "blob"])
|
||||
|
||||
async def fail_to_arrow(*args, **kwargs):
|
||||
raise AssertionError("to_arrow should not be called before native pandas")
|
||||
|
||||
monkeypatch.setattr(query, "to_arrow", fail_to_arrow)
|
||||
|
||||
df = await query.to_pandas(blob_mode="bytes")
|
||||
|
||||
assert df["id"].tolist() == [1]
|
||||
assert df["blob"].tolist() == [b"one"]
|
||||
|
||||
|
||||
def test_vector_query_to_pandas_blob_mode_requires_native_path(tmp_db):
|
||||
pytest.importorskip("lance")
|
||||
table = tmp_db.create_table("test_vector_query_blob_mode", _blob_query_data())
|
||||
|
||||
with pytest.raises(RuntimeError, match="Lance native pandas conversion"):
|
||||
table.search([1.0, 0.0]).select(["blob", "vector"]).limit(1).to_pandas(
|
||||
blob_mode="lazy"
|
||||
)
|
||||
|
||||
|
||||
def test_order_by_plain_query(mem_db):
|
||||
table = mem_db.create_table(
|
||||
"test_order_by",
|
||||
|
||||
Reference in New Issue
Block a user