From 1f3a7d4be600220c0f32f0acd814c3ebd94d1b9c Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Tue, 2 Jun 2026 18:31:28 +0800 Subject: [PATCH] feat(python): support blob modes in query to_pandas --- python/python/lancedb/query.py | 306 +++++++++++++++++++++++--- python/python/lancedb/table.py | 15 +- python/python/tests/test_namespace.py | 29 +++ python/python/tests/test_query.py | 120 ++++++++++ python/python/tests/test_table.py | 69 +++--- 5 files changed, 471 insertions(+), 68 deletions(-) diff --git a/python/python/lancedb/query.py b/python/python/lancedb/query.py index ee3e7c7cd..d146e25bf 100644 --- a/python/python/lancedb/query.py +++ b/python/python/lancedb/query.py @@ -41,6 +41,14 @@ from .rerankers.rrf import RRFReranker from .rerankers.util import check_reranker_result from .util import flatten_columns +BlobMode = Literal["lazy", "bytes", "descriptions"] + +_BLOB_MODE_TO_HANDLING = { + "lazy": "blobs_descriptions", + "bytes": "all_binary", + "descriptions": "blobs_descriptions", +} + if TYPE_CHECKING: import sys @@ -55,7 +63,7 @@ if TYPE_CHECKING: from ._lancedb import VectorQuery as LanceVectorQuery from .common import VEC from .pydantic import LanceModel - from .table import Table + from .table import AsyncTable, Table if sys.version_info >= (3, 11): from typing import Self @@ -65,6 +73,147 @@ if TYPE_CHECKING: T = TypeVar("T", bound="LanceModel") +def _validate_blob_mode(blob_mode: BlobMode) -> None: + if blob_mode not in _BLOB_MODE_TO_HANDLING: + modes = ", ".join(repr(mode) for mode in _BLOB_MODE_TO_HANDLING) + raise ValueError(f"blob_mode must be one of {modes}, got {blob_mode!r}") + + +def _field_is_blob(field: pa.Field) -> bool: + metadata = field.metadata or {} + return metadata.get(b"lance-encoding:blob") == b"true" or ( + metadata.get("lance-encoding:blob") == "true" + ) + + +def _schema_has_blob_field(schema: pa.Schema) -> bool: + return any(_field_is_blob(field) for field in schema) + + +def _blob_mode_requires_native_pandas(blob_mode: BlobMode, schema: pa.Schema) -> bool: + return blob_mode in ("lazy", "bytes") and _schema_has_blob_field(schema) + + +def _unsupported_blob_pandas_error(reason: str) -> RuntimeError: + return RuntimeError( + "blob_mode='lazy' and blob_mode='bytes' require Lance native pandas " + f"conversion for queries that return blob columns, but {reason}. " + "Use blob_mode='descriptions' or remove blob columns from the projection." + ) + + +def _query_is_plain_scan(query: Query) -> bool: + return ( + query.vector is None + and query.full_text_query is None + and not query.postfilter + and not query.order_by + ) + + +def _filter_to_sql(filter: Optional[Union[str, Expr]]) -> Optional[str]: + if filter is None: + return None + if isinstance(filter, Expr): + return filter.to_sql() + return filter + + +def _projection_to_scanner_kwargs( + columns: Optional[ + Union[ + List[str], List[Tuple[str, Union[str, Expr]]], Dict[str, Union[str, Expr]] + ] + ], +) -> Dict[str, Any]: + if columns is None: + return {} + if isinstance(columns, list): + if all(isinstance(column, str) for column in columns): + return {"columns": columns} + if all(isinstance(column, tuple) and len(column) == 2 for column in columns): + return { + "columns": { + name: expr.to_sql() if isinstance(expr, Expr) else expr + for name, expr in columns + } + } + # Let Lance raise the detailed projection validation error. + return {"columns": columns} + + projection = {} + for name, expr in columns.items(): + if isinstance(expr, Expr): + expr = expr.to_sql() + projection[name] = expr + return {"columns": projection} + + +def _scanner_kwargs_for_query(query: Query, blob_mode: BlobMode) -> Dict[str, Any]: + kwargs = { + **_projection_to_scanner_kwargs(query.columns), + "filter": _filter_to_sql(query.filter), + "limit": query.limit, + "offset": query.offset, + "with_row_id": query.with_row_id, + "fast_search": query.fast_search, + "blob_handling": _BLOB_MODE_TO_HANDLING[blob_mode], + } + return {key: value for key, value in kwargs.items() if value is not None} + + +def _ensure_lazy_blob_frame( + df: "pd.DataFrame", schema: pa.Schema, blob_mode: BlobMode +) -> "pd.DataFrame": + if blob_mode != "lazy" or not _schema_has_blob_field(schema) or len(df) == 0: + return df + + for field in schema: + if not _field_is_blob(field) or field.name not in df.columns: + continue + value = df[field.name].iloc[0] + if value is not None and not hasattr(value, "readall"): + raise _unsupported_blob_pandas_error( + "the Lance scanner did not return lazy blob files" + ) + return df + + +def _scanner_to_pandas(scanner: Any, blob_mode: BlobMode, **kwargs) -> "pd.DataFrame": + schema = getattr(scanner, "projected_schema", None) + if schema is None: + schema = getattr(scanner, "schema", None) + if schema is None: + schema = getattr(scanner, "dataset_schema", None) + if callable(schema): + schema = schema() + if hasattr(scanner, "to_pandas"): + try: + df = scanner.to_pandas(blob_mode=blob_mode, **kwargs) + except TypeError as err: + message = str(err) + if "blob_mode" not in message and "unexpected keyword" not in message: + raise + df = scanner.to_pandas(**kwargs) + if schema is not None: + return _ensure_lazy_blob_frame(df, schema, blob_mode) + return df + + if hasattr(scanner, "to_pyarrow"): + reader = scanner.to_pyarrow() + tbl = reader.read_all() + elif hasattr(scanner, "to_table"): + tbl = scanner.to_table() + else: + reader = scanner.to_reader() + tbl = reader.read_all() + if blob_mode == "lazy" and _schema_has_blob_field(tbl.schema): + raise _unsupported_blob_pandas_error( + "the Lance scanner does not expose to_pandas" + ) + return tbl.to_pandas(**kwargs) + + # Pydantic validation function for vector queries def ensure_vector_query( val: Any, @@ -718,6 +867,7 @@ class LanceQueryBuilder(ABC): self, flatten: Optional[Union[int, bool]] = None, *, + blob_mode: BlobMode = "lazy", timeout: Optional[timedelta] = None, **kwargs, ) -> "pd.DataFrame": @@ -737,11 +887,32 @@ class LanceQueryBuilder(ABC): timeout: Optional[timedelta] The maximum time to wait for the query to complete. If None, wait indefinitely. + blob_mode: str, default "lazy" + Controls how blob columns are returned for plain scan queries. + Vector, FTS, hybrid, and other non-native query shapes keep the + existing Arrow conversion path and only support blob descriptions. **kwargs Forwarded to pyarrow.Table.to_pandas after query execution and optional flattening. """ + _validate_blob_mode(blob_mode) + native_error = None + if flatten is None and timeout is None: + try: + df = self._plain_scan_to_pandas(blob_mode, **kwargs) + if df is not None: + return df + except Exception as err: + native_error = err + tbl = flatten_columns(self.to_arrow(timeout=timeout), flatten) + if _blob_mode_requires_native_pandas(blob_mode, tbl.schema): + reason = ( + "this query shape cannot use Lance native pandas conversion" + if native_error is None + else str(native_error) + ) + raise _unsupported_blob_pandas_error(reason) from native_error return tbl.to_pandas(**kwargs) @abstractmethod @@ -1086,6 +1257,19 @@ class LanceQueryBuilder(ABC): """ raise NotImplementedError + def _plain_scan_to_pandas( + self, + blob_mode: BlobMode, + **kwargs, + ) -> Optional["pd.DataFrame"]: + query = self.to_query_object() + if not _query_is_plain_scan(query): + return None + + dataset = self._table.to_lance() + scanner = dataset.scanner(**_scanner_kwargs_for_query(query, blob_mode)) + return _scanner_to_pandas(scanner, blob_mode, **kwargs) + @abstractmethod def to_query_object(self) -> Query: """Return a serializable representation of the query @@ -2207,7 +2391,11 @@ class AsyncQueryBase(object): Base class for all async queries (take, scan, vector, fts, hybrid) """ - def __init__(self, inner: Union[LanceQuery, LanceVectorQuery, LanceTakeQuery]): + def __init__( + self, + inner: Union[LanceQuery, LanceVectorQuery, LanceTakeQuery], + table: Optional["AsyncTable"] = None, + ): """ Construct an AsyncQueryBase @@ -2215,6 +2403,7 @@ class AsyncQueryBase(object): [AsyncTable.query][lancedb.table.AsyncTable.query] method to create a query. """ self._inner = inner + self._table = table def to_query_object(self) -> Query: """ @@ -2357,6 +2546,8 @@ class AsyncQueryBase(object): self, flatten: Optional[Union[int, bool]] = None, timeout: Optional[timedelta] = None, + *, + blob_mode: BlobMode = "lazy", **kwargs, ) -> "pd.DataFrame": """ @@ -2390,13 +2581,49 @@ class AsyncQueryBase(object): The maximum time to wait for the query to complete. If not specified, no timeout is applied. If the query does not complete within the specified time, an error will be raised. + blob_mode: str, default "lazy" + Controls how blob columns are returned for plain scan queries. + Vector, FTS, hybrid, and other non-native query shapes keep the + existing Arrow conversion path and only support blob descriptions. **kwargs Forwarded to pyarrow.Table.to_pandas after query execution and optional flattening. """ - return ( - flatten_columns(await self.to_arrow(timeout=timeout), flatten) - ).to_pandas(**kwargs) + _validate_blob_mode(blob_mode) + native_error = None + if flatten is None and timeout is None: + try: + df = await self._plain_scan_to_pandas(blob_mode, **kwargs) + if df is not None: + return df + except Exception as err: + native_error = err + + tbl = flatten_columns(await self.to_arrow(timeout=timeout), flatten) + if _blob_mode_requires_native_pandas(blob_mode, tbl.schema): + reason = ( + "this query shape cannot use Lance native pandas conversion" + if native_error is None + else str(native_error) + ) + raise _unsupported_blob_pandas_error(reason) from native_error + return tbl.to_pandas(**kwargs) + + async def _plain_scan_to_pandas( + self, + blob_mode: BlobMode, + **kwargs, + ) -> Optional["pd.DataFrame"]: + if self._table is None: + return None + + query = self.to_query_object() + if not _query_is_plain_scan(query): + return None + + dataset = await self._table._to_lance() + scanner = dataset.scanner(**_scanner_kwargs_for_query(query, blob_mode)) + return _scanner_to_pandas(scanner, blob_mode, **kwargs) async def to_polars( self, @@ -2503,14 +2730,18 @@ class AsyncStandardQuery(AsyncQueryBase): Base class for "standard" async queries (all but take currently) """ - def __init__(self, inner: Union[LanceQuery, LanceVectorQuery]): + def __init__( + self, + inner: Union[LanceQuery, LanceVectorQuery], + table: Optional["AsyncTable"] = None, + ): """ Construct an AsyncStandardQuery This method is not intended to be called directly. Instead, use the [AsyncTable.query][lancedb.table.AsyncTable.query] method to create a query. """ - super().__init__(inner) + super().__init__(inner, table) def where(self, predicate: Union[str, Expr]) -> Self: """ @@ -2616,14 +2847,14 @@ class AsyncStandardQuery(AsyncQueryBase): class AsyncQuery(AsyncStandardQuery): - def __init__(self, inner: LanceQuery): + def __init__(self, inner: LanceQuery, table: Optional["AsyncTable"] = None): """ Construct an AsyncQuery This method is not intended to be called directly. Instead, use the [AsyncTable.query][lancedb.table.AsyncTable.query] method to create a query. """ - super().__init__(inner) + super().__init__(inner, table) self._inner = inner @classmethod @@ -2707,10 +2938,11 @@ class AsyncQuery(AsyncStandardQuery): new_self = self._inner.nearest_to(query_vectors[0]) for v in query_vectors[1:]: new_self.add_query_vector(v) - return AsyncVectorQuery(new_self) + return AsyncVectorQuery(new_self, self._table) else: return AsyncVectorQuery( - self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector)) + self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector)), + self._table, ) def nearest_to_text( @@ -2743,17 +2975,18 @@ class AsyncQuery(AsyncStandardQuery): if isinstance(query, str): return AsyncFTSQuery( - self._inner.nearest_to_text({"query": query, "columns": columns}) + self._inner.nearest_to_text({"query": query, "columns": columns}), + self._table, ) # FullTextQuery object - return AsyncFTSQuery(self._inner.nearest_to_text({"query": query})) + return AsyncFTSQuery(self._inner.nearest_to_text({"query": query}), self._table) class AsyncFTSQuery(AsyncStandardQuery): """A query for full text search for LanceDB.""" - def __init__(self, inner: LanceFTSQuery): - super().__init__(inner) + def __init__(self, inner: LanceFTSQuery, table: Optional["AsyncTable"] = None): + super().__init__(inner, table) self._inner = inner self._reranker = None @@ -2835,10 +3068,11 @@ class AsyncFTSQuery(AsyncStandardQuery): new_self = self._inner.nearest_to(query_vectors[0]) for v in query_vectors[1:]: new_self.add_query_vector(v) - return AsyncHybridQuery(new_self) + return AsyncHybridQuery(new_self, self._table) else: return AsyncHybridQuery( - self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector)) + self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector)), + self._table, ) async def to_batches( @@ -3029,7 +3263,7 @@ class AsyncVectorQueryBase: class AsyncVectorQuery(AsyncStandardQuery, AsyncVectorQueryBase): - def __init__(self, inner: LanceVectorQuery): + def __init__(self, inner: LanceVectorQuery, table: Optional["AsyncTable"] = None): """ Construct an AsyncVectorQuery @@ -3039,7 +3273,7 @@ class AsyncVectorQuery(AsyncStandardQuery, AsyncVectorQueryBase): a vector query. Or you can use [AsyncTable.vector_search][lancedb.table.AsyncTable.vector_search] """ - super().__init__(inner) + super().__init__(inner, table) self._inner = inner self._reranker = None self._query_string = None @@ -3093,10 +3327,13 @@ class AsyncVectorQuery(AsyncStandardQuery, AsyncVectorQueryBase): if isinstance(query, str): return AsyncHybridQuery( - self._inner.nearest_to_text({"query": query, "columns": columns}) + self._inner.nearest_to_text({"query": query, "columns": columns}), + self._table, ) # FullTextQuery object - return AsyncHybridQuery(self._inner.nearest_to_text({"query": query})) + return AsyncHybridQuery( + self._inner.nearest_to_text({"query": query}), self._table + ) async def to_batches( self, @@ -3123,8 +3360,8 @@ class AsyncHybridQuery(AsyncStandardQuery, AsyncVectorQueryBase): in the `rerank` method to convert the scores to ranks and then normalize them. """ - def __init__(self, inner: LanceHybridQuery): - super().__init__(inner) + def __init__(self, inner: LanceHybridQuery, table: Optional["AsyncTable"] = None): + super().__init__(inner, table) self._inner = inner self._norm = "score" self._reranker = RRFReranker() @@ -3165,8 +3402,8 @@ class AsyncHybridQuery(AsyncStandardQuery, AsyncVectorQueryBase): max_batch_length: Optional[int] = None, timeout: Optional[timedelta] = None, ) -> AsyncRecordBatchReader: - fts_query = AsyncFTSQuery(self._inner.to_fts_query()) - vec_query = AsyncVectorQuery(self._inner.to_vector_query()) + fts_query = AsyncFTSQuery(self._inner.to_fts_query(), self._table) + vec_query = AsyncVectorQuery(self._inner.to_vector_query(), self._table) # save the row ID choice that was made on the query builder and force it # to actually fetch the row ids because we need this for reranking @@ -3266,8 +3503,15 @@ class AsyncTakeQuery(AsyncQueryBase): Builder for parameterizing and executing take queries. """ - def __init__(self, inner: LanceTakeQuery): - super().__init__(inner) + def __init__(self, inner: LanceTakeQuery, table: Optional["AsyncTable"] = None): + super().__init__(inner, table) + + async def _plain_scan_to_pandas( + self, + blob_mode: BlobMode, + **kwargs, + ) -> Optional["pd.DataFrame"]: + return None class BaseQueryBuilder(object): @@ -3400,6 +3644,8 @@ class BaseQueryBuilder(object): self, flatten: Optional[Union[int, bool]] = None, timeout: Optional[timedelta] = None, + *, + blob_mode: BlobMode = "lazy", **kwargs, ) -> "pd.DataFrame": """ @@ -3433,11 +3679,15 @@ class BaseQueryBuilder(object): The maximum time to wait for the query to complete. If not specified, no timeout is applied. If the query does not complete within the specified time, an error will be raised. + blob_mode: str, default "lazy" + Controls how blob columns are returned for plain scan queries. **kwargs Forwarded to pyarrow.Table.to_pandas after query execution and optional flattening. """ - return LOOP.run(self._inner.to_pandas(flatten, timeout, **kwargs)) + return LOOP.run( + self._inner.to_pandas(flatten, timeout, blob_mode=blob_mode, **kwargs) + ) def to_polars( self, diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 2de369419..f2307dc1b 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -2270,9 +2270,10 @@ class LanceTable(Table): ------- pd.DataFrame """ - if blob_mode == "lazy" and ( - self._namespace_client is not None - or get_uri_scheme(self._dataset_path) == "memory" + if ( + blob_mode == "lazy" + and self._namespace_client is None + and get_uri_scheme(self._dataset_path) == "memory" ): return self.to_arrow().to_pandas(**kwargs) @@ -4280,7 +4281,7 @@ class AsyncTable: can be executed with methods like [to_arrow][lancedb.query.AsyncQuery.to_arrow], [to_pandas][lancedb.query.AsyncQuery.to_pandas] and more. """ - return AsyncQuery(self._inner.query()) + return AsyncQuery(self._inner.query(), self) async def _to_lance(self, **kwargs) -> lance.LanceDataset: try: @@ -4312,7 +4313,7 @@ class AsyncTable: ------- pd.DataFrame """ - if blob_mode == "lazy": + if blob_mode == "lazy" and get_uri_scheme(await self.uri()) == "memory": return (await self.to_arrow()).to_pandas(**kwargs) return (await self._to_lance()).to_pandas(blob_mode=blob_mode, **kwargs) @@ -5349,7 +5350,7 @@ class AsyncTable: pa.RecordBatch A record batch containing the rows at the given offsets. """ - return AsyncTakeQuery(self._inner.take_offsets(offsets)) + return AsyncTakeQuery(self._inner.take_offsets(offsets), self) def take_row_ids(self, row_ids: list[int]) -> AsyncTakeQuery: """ @@ -5378,7 +5379,7 @@ class AsyncTable: AsyncTakeQuery A query object that can be executed to get the rows. """ - return AsyncTakeQuery(self._inner.take_row_ids(row_ids)) + return AsyncTakeQuery(self._inner.take_row_ids(row_ids), self) @property def tags(self) -> AsyncTags: diff --git a/python/python/tests/test_namespace.py b/python/python/tests/test_namespace.py index bbf7e4c6f..f9c923ad2 100644 --- a/python/python/tests/test_namespace.py +++ b/python/python/tests/test_namespace.py @@ -76,6 +76,35 @@ class TestNamespaceConnection: assert len(result) == 0 assert list(result.columns) == ["id", "vector", "text"] + def test_table_to_pandas_blob_lazy_through_namespace(self): + """Namespace-backed tables should use Lance blob-aware pandas conversion.""" + pytest.importorskip("lance") + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) + db.create_namespace(["test_ns"]) + data = pa.table( + { + "id": pa.array([1, 2], pa.int64()), + "blob": pa.array([b"hello", b"world"], pa.large_binary()), + }, + schema=pa.schema( + [ + pa.field("id", pa.int64()), + pa.field( + "blob", + pa.large_binary(), + metadata={"lance-encoding:blob": "true"}, + ), + ] + ), + ) + + table = db.create_table("blob_table", data, namespace_path=["test_ns"]) + df = table.to_pandas(blob_mode="lazy").sort_values("id") + + blob = df["blob"].iloc[0] + assert hasattr(blob, "readall") + assert blob.readall() == b"hello" + def test_open_table_through_namespace(self): """Test opening an existing table through namespace.""" db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) diff --git a/python/python/tests/test_query.py b/python/python/tests/test_query.py index 891ed808f..8215bfbe4 100644 --- a/python/python/tests/test_query.py +++ b/python/python/tests/test_query.py @@ -39,6 +39,35 @@ from utils import exception_output from importlib.util import find_spec +def _blob_query_data(): + return pa.table( + { + "id": pa.array([1, 2, 3, 4], pa.int64()), + "tag": pa.array(["drop", "keep", "keep", "keep"], pa.utf8()), + "vector": pa.array( + [[1.0, 0.0], [2.0, 0.0], [3.0, 0.0], [4.0, 0.0]], + type=pa.list_(pa.float32(), list_size=2), + ), + "blob": pa.array([b"one", b"two", b"three", b"four"], pa.large_binary()), + }, + schema=pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("tag", pa.utf8()), + pa.field("vector", pa.list_(pa.float32(), list_size=2)), + pa.field( + "blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"} + ), + ] + ), + ) + + +def _assert_lazy_blob(value, expected: bytes): + assert hasattr(value, "readall") + assert value.readall() == expected + + @pytest.fixture(scope="module") def table(tmpdir_factory) -> lancedb.table.Table: tmp_path = str(tmpdir_factory.mktemp("data")) @@ -181,6 +210,97 @@ async def test_query_to_pandas_kwargs(table, table_async): assert async_df["id"].tolist() == [1, 2] +@pytest.mark.parametrize("blob_mode", ["lazy", "bytes", "descriptions"]) +def test_plain_scan_query_to_pandas_blob_modes(tmp_db, blob_mode): + pytest.importorskip("lance") + table = tmp_db.create_table( + f"test_query_to_pandas_blob_{blob_mode}", _blob_query_data() + ) + + df = ( + table.search() + .select(["id", "blob"]) + .where("id = 1") + .to_pandas(blob_mode=blob_mode) + ) + + assert df["id"].tolist() == [1] + if blob_mode == "lazy": + _assert_lazy_blob(df["blob"].iloc[0], b"one") + elif blob_mode == "bytes": + assert df["blob"].tolist() == [b"one"] + else: + first = df["blob"].iloc[0] + assert first != b"one" + assert not hasattr(first, "readall") + + +def test_plain_scan_query_to_pandas_blob_projection(tmp_db): + pytest.importorskip("lance") + table = tmp_db.create_table( + "test_query_to_pandas_blob_projection", _blob_query_data() + ) + + df = ( + table.search() + .where("id >= 2") + .select({"id_alias": "id", "payload": "blob", "double_id": "id * 2"}) + .limit(2) + .offset(1) + .to_pandas(blob_mode="bytes") + ) + + assert df["id_alias"].tolist() == [3, 4] + assert df["payload"].tolist() == [b"three", b"four"] + assert df["double_id"].tolist() == [6, 8] + + +@pytest.mark.asyncio +async def test_async_plain_scan_query_to_pandas_blob_projection(tmp_db_async): + pytest.importorskip("lance") + table = await tmp_db_async.create_table( + "test_async_query_to_pandas_blob_projection", _blob_query_data() + ) + + lazy_df = await ( + table.query().where("id = 1").select(["id", "blob"]).to_pandas(blob_mode="lazy") + ) + assert lazy_df["id"].tolist() == [1] + _assert_lazy_blob(lazy_df["blob"].iloc[0], b"one") + + bytes_df = await ( + table.query() + .where("id >= 2") + .select({"id_alias": "id", "payload": "blob", "double_id": "id * 2"}) + .limit(2) + .offset(1) + .to_pandas(blob_mode="bytes") + ) + assert bytes_df["id_alias"].tolist() == [3, 4] + assert bytes_df["payload"].tolist() == [b"three", b"four"] + assert bytes_df["double_id"].tolist() == [6, 8] + + desc_df = await ( + table.query() + .where("id = 1") + .select(["blob"]) + .to_pandas(blob_mode="descriptions") + ) + first = desc_df["blob"].iloc[0] + assert first != b"one" + assert not hasattr(first, "readall") + + +def test_vector_query_to_pandas_blob_mode_requires_native_path(tmp_db): + pytest.importorskip("lance") + table = tmp_db.create_table("test_vector_query_blob_mode", _blob_query_data()) + + with pytest.raises(RuntimeError, match="Lance native pandas conversion"): + table.search([1.0, 0.0]).select(["blob", "vector"]).limit(1).to_pandas( + blob_mode="lazy" + ) + + def test_order_by_plain_query(mem_db): table = mem_db.create_table( "test_order_by", diff --git a/python/python/tests/test_table.py b/python/python/tests/test_table.py index 2a07c2df6..984834cb2 100644 --- a/python/python/tests/test_table.py +++ b/python/python/tests/test_table.py @@ -26,6 +26,28 @@ from lancedb.table import LanceTable from pydantic import BaseModel +def _blob_test_data(): + return pa.table( + { + "id": pa.array([1, 2], pa.int64()), + "blob": pa.array([b"hello", b"world"], pa.large_binary()), + }, + schema=pa.schema( + [ + pa.field("id", pa.int64()), + pa.field( + "blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"} + ), + ] + ), + ) + + +def _assert_lazy_blob(value, expected: bytes): + assert hasattr(value, "readall") + assert value.readall() == expected + + def test_basic(mem_db: DBConnection): data = [ {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}, @@ -57,27 +79,22 @@ def test_table_to_pandas_default_matches_arrow(tmp_db: DBConnection): pd.testing.assert_frame_equal(table.to_pandas(), expected) -def test_table_to_pandas_blob_bytes(tmp_db: DBConnection): +@pytest.mark.parametrize("blob_mode", ["lazy", "bytes", "descriptions"]) +def test_table_to_pandas_blob_modes(tmp_db: DBConnection, blob_mode): pytest.importorskip("lance") - data = pa.table( - { - "id": pa.array([1, 2], pa.int64()), - "blob": pa.array([b"hello", b"world"], pa.large_binary()), - }, - schema=pa.schema( - [ - pa.field("id", pa.int64()), - pa.field( - "blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"} - ), - ] - ), - ) - table = tmp_db.create_table("test_to_pandas_blob_bytes", data=data) + table = tmp_db.create_table(f"test_to_pandas_blob_{blob_mode}", _blob_test_data()) - df = table.to_pandas(blob_mode="bytes") + df = table.to_pandas(blob_mode=blob_mode) - assert df["blob"].tolist() == [b"hello", b"world"] + if blob_mode == "lazy": + _assert_lazy_blob(df["blob"].iloc[0], b"hello") + _assert_lazy_blob(df["blob"].iloc[1], b"world") + elif blob_mode == "bytes": + assert df["blob"].tolist() == [b"hello", b"world"] + else: + first = df["blob"].iloc[0] + assert first != b"hello" + assert not hasattr(first, "readall") def test_table_to_pandas_kwargs(tmp_db: DBConnection): @@ -93,22 +110,8 @@ def test_table_to_pandas_kwargs(tmp_db: DBConnection): @pytest.mark.asyncio async def test_async_table_to_pandas_blob_bytes(tmp_db_async: AsyncConnection): pytest.importorskip("lance") - data = pa.table( - { - "id": pa.array([1, 2], pa.int64()), - "blob": pa.array([b"hello", b"world"], pa.large_binary()), - }, - schema=pa.schema( - [ - pa.field("id", pa.int64()), - pa.field( - "blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"} - ), - ] - ), - ) table = await tmp_db_async.create_table( - "test_async_to_pandas_blob_bytes", data=data + "test_async_to_pandas_blob_bytes", data=_blob_test_data() ) df = await table.to_pandas(blob_mode="bytes")