From 6f18eb4ccecf851a4451f28af4bccca1f0276d33 Mon Sep 17 00:00:00 2001 From: Yang Cen Date: Wed, 3 Jun 2026 19:15:44 +0800 Subject: [PATCH] feat(python): support blob modes in query to_pandas (#3487) ## Feature - What is the new feature? - Adds `blob_mode` support to sync and async Python query `to_pandas()` APIs. - Enables plain scan queries to return blob columns as lazy `BlobFile` objects, raw bytes, or blob descriptions. - Lets namespace-backed local tables use Lance native blob-aware pandas conversion for lazy blobs. - Why do we need this feature? - Table and Lance dataset/scanner APIs already support blob-aware pandas conversion, but LanceDB query builders did not expose that capability. - Geneva and other callers should be able to use query-level `to_pandas(blob_mode=...)` without manually constructing Lance scanners. - How does it work? - Plain scan queries route through Lance scanner native `to_pandas(blob_mode=...)`, preserving filter, projection, limit, offset, row id, and alias/expression projection behavior. - Non-native query shapes keep existing Arrow fallback semantics and raise a clear error when they return blob columns with `blob_mode="lazy"` or `blob_mode="bytes"`. - Focused tests cover table/query blob modes, filter/select/limit/offset/alias query cases, async query behavior, vector-query error boundaries, and namespace-backed lazy blobs. ## Validation - `cd python && .venv/bin/maturin develop --uv --extras tests,dev --profile dev` - `cd python && uv run --frozen --no-sync pytest python/tests/test_table.py::test_table_to_pandas_blob_modes python/tests/test_table.py::test_async_table_to_pandas_blob_bytes python/tests/test_query.py::test_plain_scan_query_to_pandas_blob_modes python/tests/test_query.py::test_plain_scan_query_to_pandas_blob_projection python/tests/test_query.py::test_async_plain_scan_query_to_pandas_blob_projection python/tests/test_query.py::test_vector_query_to_pandas_blob_mode_requires_native_path python/tests/test_namespace.py::TestNamespaceConnection::test_table_to_pandas_blob_lazy_through_namespace -q` - `cd python && uv run --frozen --no-sync ruff format --check .` - `cd python && uv run --frozen --no-sync ruff check .` - `git diff --check` --- python/python/lancedb/query.py | 319 +++++++++++++++++++++++--- python/python/lancedb/table.py | 45 +++- python/python/tests/test_namespace.py | 29 +++ python/python/tests/test_query.py | 161 +++++++++++++ python/python/tests/test_table.py | 90 +++++--- 5 files changed, 576 insertions(+), 68 deletions(-) diff --git a/python/python/lancedb/query.py b/python/python/lancedb/query.py index ee3e7c7cd..7fa018892 100644 --- a/python/python/lancedb/query.py +++ b/python/python/lancedb/query.py @@ -41,6 +41,14 @@ from .rerankers.rrf import RRFReranker from .rerankers.util import check_reranker_result from .util import flatten_columns +BlobMode = Literal["lazy", "bytes", "descriptions"] + +_BLOB_MODE_TO_HANDLING = { + "lazy": "blobs_descriptions", + "bytes": "all_binary", + "descriptions": "blobs_descriptions", +} + if TYPE_CHECKING: import sys @@ -55,7 +63,7 @@ if TYPE_CHECKING: from ._lancedb import VectorQuery as LanceVectorQuery from .common import VEC from .pydantic import LanceModel - from .table import Table + from .table import AsyncTable, Table if sys.version_info >= (3, 11): from typing import Self @@ -65,6 +73,147 @@ if TYPE_CHECKING: T = TypeVar("T", bound="LanceModel") +def _validate_blob_mode(blob_mode: BlobMode) -> None: + if blob_mode not in _BLOB_MODE_TO_HANDLING: + modes = ", ".join(repr(mode) for mode in _BLOB_MODE_TO_HANDLING) + raise ValueError(f"blob_mode must be one of {modes}, got {blob_mode!r}") + + +def _field_is_blob(field: pa.Field) -> bool: + metadata = field.metadata or {} + return metadata.get(b"lance-encoding:blob") == b"true" or ( + metadata.get("lance-encoding:blob") == "true" + ) + + +def _schema_has_blob_field(schema: pa.Schema) -> bool: + return any(_field_is_blob(field) for field in schema) + + +def _blob_mode_requires_native_pandas(blob_mode: BlobMode, schema: pa.Schema) -> bool: + return blob_mode in ("lazy", "bytes") and _schema_has_blob_field(schema) + + +def _unsupported_blob_pandas_error(reason: str) -> RuntimeError: + return RuntimeError( + "blob_mode='lazy' and blob_mode='bytes' require Lance native pandas " + f"conversion for queries that return blob columns, but {reason}. " + "Use blob_mode='descriptions' or remove blob columns from the projection." + ) + + +def _query_is_plain_scan(query: Query) -> bool: + return ( + query.vector is None + and query.full_text_query is None + and not query.postfilter + and not query.order_by + ) + + +def _filter_to_sql(filter: Optional[Union[str, Expr]]) -> Optional[str]: + if filter is None: + return None + if isinstance(filter, Expr): + return filter.to_sql() + return filter + + +def _projection_to_scanner_kwargs( + columns: Optional[ + Union[ + List[str], List[Tuple[str, Union[str, Expr]]], Dict[str, Union[str, Expr]] + ] + ], +) -> Dict[str, Any]: + if columns is None: + return {} + if isinstance(columns, list): + if all(isinstance(column, str) for column in columns): + return {"columns": columns} + if all(isinstance(column, tuple) and len(column) == 2 for column in columns): + return { + "columns": { + name: expr.to_sql() if isinstance(expr, Expr) else expr + for name, expr in columns + } + } + # Let Lance raise the detailed projection validation error. + return {"columns": columns} + + projection = {} + for name, expr in columns.items(): + if isinstance(expr, Expr): + expr = expr.to_sql() + projection[name] = expr + return {"columns": projection} + + +def _scanner_kwargs_for_query(query: Query, blob_mode: BlobMode) -> Dict[str, Any]: + kwargs = { + **_projection_to_scanner_kwargs(query.columns), + "filter": _filter_to_sql(query.filter), + "limit": query.limit, + "offset": query.offset, + "with_row_id": query.with_row_id, + "fast_search": query.fast_search, + "blob_handling": _BLOB_MODE_TO_HANDLING[blob_mode], + } + return {key: value for key, value in kwargs.items() if value is not None} + + +def _ensure_lazy_blob_frame( + df: "pd.DataFrame", schema: pa.Schema, blob_mode: BlobMode +) -> "pd.DataFrame": + if blob_mode != "lazy" or not _schema_has_blob_field(schema) or len(df) == 0: + return df + + for field in schema: + if not _field_is_blob(field) or field.name not in df.columns: + continue + value = df[field.name].iloc[0] + if value is not None and not hasattr(value, "readall"): + raise _unsupported_blob_pandas_error( + "the Lance scanner did not return lazy blob files" + ) + return df + + +def _scanner_to_pandas(scanner: Any, blob_mode: BlobMode, **kwargs) -> "pd.DataFrame": + schema = getattr(scanner, "projected_schema", None) + if schema is None: + schema = getattr(scanner, "schema", None) + if schema is None: + schema = getattr(scanner, "dataset_schema", None) + if callable(schema): + schema = schema() + if hasattr(scanner, "to_pandas"): + try: + df = scanner.to_pandas(blob_mode=blob_mode, **kwargs) + except TypeError as err: + message = str(err) + if "blob_mode" not in message and "unexpected keyword" not in message: + raise + df = scanner.to_pandas(**kwargs) + if schema is not None: + return _ensure_lazy_blob_frame(df, schema, blob_mode) + return df + + if hasattr(scanner, "to_pyarrow"): + reader = scanner.to_pyarrow() + tbl = reader.read_all() + elif hasattr(scanner, "to_table"): + tbl = scanner.to_table() + else: + reader = scanner.to_reader() + tbl = reader.read_all() + if blob_mode == "lazy" and _schema_has_blob_field(tbl.schema): + raise _unsupported_blob_pandas_error( + "the Lance scanner does not expose to_pandas" + ) + return tbl.to_pandas(**kwargs) + + # Pydantic validation function for vector queries def ensure_vector_query( val: Any, @@ -718,6 +867,7 @@ class LanceQueryBuilder(ABC): self, flatten: Optional[Union[int, bool]] = None, *, + blob_mode: BlobMode = "lazy", timeout: Optional[timedelta] = None, **kwargs, ) -> "pd.DataFrame": @@ -737,11 +887,39 @@ class LanceQueryBuilder(ABC): timeout: Optional[timedelta] The maximum time to wait for the query to complete. If None, wait indefinitely. + blob_mode: str, default "lazy" + Controls how blob columns are returned for plain scan queries. + Vector, FTS, hybrid, and other non-native query shapes keep the + existing Arrow conversion path and only support blob descriptions. **kwargs Forwarded to pyarrow.Table.to_pandas after query execution and optional flattening. """ + _validate_blob_mode(blob_mode) + output_schema = getattr(self, "output_schema", None) + if output_schema is not None: + schema = output_schema() + if _blob_mode_requires_native_pandas(blob_mode, schema): + native_error = None + if flatten is None and timeout is None: + try: + df = self._plain_scan_to_pandas(blob_mode, **kwargs) + if df is not None: + return df + except Exception as err: + native_error = err + reason = ( + "this query shape cannot use Lance native pandas conversion" + if native_error is None + else str(native_error) + ) + raise _unsupported_blob_pandas_error(reason) from native_error + tbl = flatten_columns(self.to_arrow(timeout=timeout), flatten) + if _blob_mode_requires_native_pandas(blob_mode, tbl.schema): + raise _unsupported_blob_pandas_error( + "this query shape cannot use Lance native pandas conversion" + ) return tbl.to_pandas(**kwargs) @abstractmethod @@ -1086,6 +1264,19 @@ class LanceQueryBuilder(ABC): """ raise NotImplementedError + def _plain_scan_to_pandas( + self, + blob_mode: BlobMode, + **kwargs, + ) -> Optional["pd.DataFrame"]: + query = self.to_query_object() + if not _query_is_plain_scan(query): + return None + + dataset = self._table.to_lance() + scanner = dataset.scanner(**_scanner_kwargs_for_query(query, blob_mode)) + return _scanner_to_pandas(scanner, blob_mode, **kwargs) + @abstractmethod def to_query_object(self) -> Query: """Return a serializable representation of the query @@ -2207,7 +2398,11 @@ class AsyncQueryBase(object): Base class for all async queries (take, scan, vector, fts, hybrid) """ - def __init__(self, inner: Union[LanceQuery, LanceVectorQuery, LanceTakeQuery]): + def __init__( + self, + inner: Union[LanceQuery, LanceVectorQuery, LanceTakeQuery], + table: Optional["AsyncTable"] = None, + ): """ Construct an AsyncQueryBase @@ -2215,6 +2410,7 @@ class AsyncQueryBase(object): [AsyncTable.query][lancedb.table.AsyncTable.query] method to create a query. """ self._inner = inner + self._table = table def to_query_object(self) -> Query: """ @@ -2357,6 +2553,8 @@ class AsyncQueryBase(object): self, flatten: Optional[Union[int, bool]] = None, timeout: Optional[timedelta] = None, + *, + blob_mode: BlobMode = "lazy", **kwargs, ) -> "pd.DataFrame": """ @@ -2390,13 +2588,55 @@ class AsyncQueryBase(object): The maximum time to wait for the query to complete. If not specified, no timeout is applied. If the query does not complete within the specified time, an error will be raised. + blob_mode: str, default "lazy" + Controls how blob columns are returned for plain scan queries. + Vector, FTS, hybrid, and other non-native query shapes keep the + existing Arrow conversion path and only support blob descriptions. **kwargs Forwarded to pyarrow.Table.to_pandas after query execution and optional flattening. """ - return ( - flatten_columns(await self.to_arrow(timeout=timeout), flatten) - ).to_pandas(**kwargs) + _validate_blob_mode(blob_mode) + if hasattr(self._inner, "output_schema"): + schema = await self.output_schema() + if _blob_mode_requires_native_pandas(blob_mode, schema): + native_error = None + if flatten is None and timeout is None: + try: + df = await self._plain_scan_to_pandas(blob_mode, **kwargs) + if df is not None: + return df + except Exception as err: + native_error = err + reason = ( + "this query shape cannot use Lance native pandas conversion" + if native_error is None + else str(native_error) + ) + raise _unsupported_blob_pandas_error(reason) from native_error + + tbl = flatten_columns(await self.to_arrow(timeout=timeout), flatten) + if _blob_mode_requires_native_pandas(blob_mode, tbl.schema): + raise _unsupported_blob_pandas_error( + "this query shape cannot use Lance native pandas conversion" + ) + return tbl.to_pandas(**kwargs) + + async def _plain_scan_to_pandas( + self, + blob_mode: BlobMode, + **kwargs, + ) -> Optional["pd.DataFrame"]: + if self._table is None: + return None + + query = self.to_query_object() + if not _query_is_plain_scan(query): + return None + + dataset = await self._table._to_lance() + scanner = dataset.scanner(**_scanner_kwargs_for_query(query, blob_mode)) + return _scanner_to_pandas(scanner, blob_mode, **kwargs) async def to_polars( self, @@ -2503,14 +2743,18 @@ class AsyncStandardQuery(AsyncQueryBase): Base class for "standard" async queries (all but take currently) """ - def __init__(self, inner: Union[LanceQuery, LanceVectorQuery]): + def __init__( + self, + inner: Union[LanceQuery, LanceVectorQuery], + table: Optional["AsyncTable"] = None, + ): """ Construct an AsyncStandardQuery This method is not intended to be called directly. Instead, use the [AsyncTable.query][lancedb.table.AsyncTable.query] method to create a query. """ - super().__init__(inner) + super().__init__(inner, table) def where(self, predicate: Union[str, Expr]) -> Self: """ @@ -2616,14 +2860,14 @@ class AsyncStandardQuery(AsyncQueryBase): class AsyncQuery(AsyncStandardQuery): - def __init__(self, inner: LanceQuery): + def __init__(self, inner: LanceQuery, table: Optional["AsyncTable"] = None): """ Construct an AsyncQuery This method is not intended to be called directly. Instead, use the [AsyncTable.query][lancedb.table.AsyncTable.query] method to create a query. """ - super().__init__(inner) + super().__init__(inner, table) self._inner = inner @classmethod @@ -2707,10 +2951,11 @@ class AsyncQuery(AsyncStandardQuery): new_self = self._inner.nearest_to(query_vectors[0]) for v in query_vectors[1:]: new_self.add_query_vector(v) - return AsyncVectorQuery(new_self) + return AsyncVectorQuery(new_self, self._table) else: return AsyncVectorQuery( - self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector)) + self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector)), + self._table, ) def nearest_to_text( @@ -2743,17 +2988,18 @@ class AsyncQuery(AsyncStandardQuery): if isinstance(query, str): return AsyncFTSQuery( - self._inner.nearest_to_text({"query": query, "columns": columns}) + self._inner.nearest_to_text({"query": query, "columns": columns}), + self._table, ) # FullTextQuery object - return AsyncFTSQuery(self._inner.nearest_to_text({"query": query})) + return AsyncFTSQuery(self._inner.nearest_to_text({"query": query}), self._table) class AsyncFTSQuery(AsyncStandardQuery): """A query for full text search for LanceDB.""" - def __init__(self, inner: LanceFTSQuery): - super().__init__(inner) + def __init__(self, inner: LanceFTSQuery, table: Optional["AsyncTable"] = None): + super().__init__(inner, table) self._inner = inner self._reranker = None @@ -2835,10 +3081,11 @@ class AsyncFTSQuery(AsyncStandardQuery): new_self = self._inner.nearest_to(query_vectors[0]) for v in query_vectors[1:]: new_self.add_query_vector(v) - return AsyncHybridQuery(new_self) + return AsyncHybridQuery(new_self, self._table) else: return AsyncHybridQuery( - self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector)) + self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector)), + self._table, ) async def to_batches( @@ -3029,7 +3276,7 @@ class AsyncVectorQueryBase: class AsyncVectorQuery(AsyncStandardQuery, AsyncVectorQueryBase): - def __init__(self, inner: LanceVectorQuery): + def __init__(self, inner: LanceVectorQuery, table: Optional["AsyncTable"] = None): """ Construct an AsyncVectorQuery @@ -3039,7 +3286,7 @@ class AsyncVectorQuery(AsyncStandardQuery, AsyncVectorQueryBase): a vector query. Or you can use [AsyncTable.vector_search][lancedb.table.AsyncTable.vector_search] """ - super().__init__(inner) + super().__init__(inner, table) self._inner = inner self._reranker = None self._query_string = None @@ -3093,10 +3340,13 @@ class AsyncVectorQuery(AsyncStandardQuery, AsyncVectorQueryBase): if isinstance(query, str): return AsyncHybridQuery( - self._inner.nearest_to_text({"query": query, "columns": columns}) + self._inner.nearest_to_text({"query": query, "columns": columns}), + self._table, ) # FullTextQuery object - return AsyncHybridQuery(self._inner.nearest_to_text({"query": query})) + return AsyncHybridQuery( + self._inner.nearest_to_text({"query": query}), self._table + ) async def to_batches( self, @@ -3123,8 +3373,8 @@ class AsyncHybridQuery(AsyncStandardQuery, AsyncVectorQueryBase): in the `rerank` method to convert the scores to ranks and then normalize them. """ - def __init__(self, inner: LanceHybridQuery): - super().__init__(inner) + def __init__(self, inner: LanceHybridQuery, table: Optional["AsyncTable"] = None): + super().__init__(inner, table) self._inner = inner self._norm = "score" self._reranker = RRFReranker() @@ -3165,8 +3415,8 @@ class AsyncHybridQuery(AsyncStandardQuery, AsyncVectorQueryBase): max_batch_length: Optional[int] = None, timeout: Optional[timedelta] = None, ) -> AsyncRecordBatchReader: - fts_query = AsyncFTSQuery(self._inner.to_fts_query()) - vec_query = AsyncVectorQuery(self._inner.to_vector_query()) + fts_query = AsyncFTSQuery(self._inner.to_fts_query(), self._table) + vec_query = AsyncVectorQuery(self._inner.to_vector_query(), self._table) # save the row ID choice that was made on the query builder and force it # to actually fetch the row ids because we need this for reranking @@ -3266,8 +3516,15 @@ class AsyncTakeQuery(AsyncQueryBase): Builder for parameterizing and executing take queries. """ - def __init__(self, inner: LanceTakeQuery): - super().__init__(inner) + def __init__(self, inner: LanceTakeQuery, table: Optional["AsyncTable"] = None): + super().__init__(inner, table) + + async def _plain_scan_to_pandas( + self, + blob_mode: BlobMode, + **kwargs, + ) -> Optional["pd.DataFrame"]: + return None class BaseQueryBuilder(object): @@ -3400,6 +3657,8 @@ class BaseQueryBuilder(object): self, flatten: Optional[Union[int, bool]] = None, timeout: Optional[timedelta] = None, + *, + blob_mode: BlobMode = "lazy", **kwargs, ) -> "pd.DataFrame": """ @@ -3433,11 +3692,15 @@ class BaseQueryBuilder(object): The maximum time to wait for the query to complete. If not specified, no timeout is applied. If the query does not complete within the specified time, an error will be raised. + blob_mode: str, default "lazy" + Controls how blob columns are returned for plain scan queries. **kwargs Forwarded to pyarrow.Table.to_pandas after query execution and optional flattening. """ - return LOOP.run(self._inner.to_pandas(flatten, timeout, **kwargs)) + return LOOP.run( + self._inner.to_pandas(flatten, timeout, blob_mode=blob_mode, **kwargs) + ) def to_polars( self, diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 7adc2cc54..eac48206b 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -89,6 +89,26 @@ from .index import lang_mapping BlobMode = Literal["lazy", "bytes", "descriptions"] +_VALID_BLOB_MODES = ("lazy", "bytes", "descriptions") + + +def _validate_blob_mode(blob_mode: BlobMode) -> None: + if blob_mode not in _VALID_BLOB_MODES: + modes = ", ".join(repr(mode) for mode in _VALID_BLOB_MODES) + raise ValueError(f"blob_mode must be one of {modes}, got {blob_mode!r}") + + +def _field_is_blob(field: pa.Field) -> bool: + metadata = field.metadata or {} + return metadata.get(b"lance-encoding:blob") == b"true" or ( + metadata.get("lance-encoding:blob") == "true" + ) + + +def _schema_has_blob_field(schema: pa.Schema) -> bool: + return any(_field_is_blob(field) for field in schema) + + _MODEL_BACKED_TOKENIZER_PREFIXES = ("jieba", "lindera") _MODEL_BACKED_TOKENIZER_ERRORS = ( "unknown base tokenizer", @@ -2294,9 +2314,14 @@ class LanceTable(Table): ------- pd.DataFrame """ - if blob_mode == "lazy" and ( - self._namespace_client is not None - or get_uri_scheme(self._dataset_path) == "memory" + _validate_blob_mode(blob_mode) + if blob_mode == "descriptions" or not _schema_has_blob_field(self.schema): + return self.to_arrow().to_pandas(**kwargs) + + if ( + blob_mode == "lazy" + and self._namespace_client is None + and get_uri_scheme(self._dataset_path) == "memory" ): return self.to_arrow().to_pandas(**kwargs) @@ -4317,7 +4342,7 @@ class AsyncTable: can be executed with methods like [to_arrow][lancedb.query.AsyncQuery.to_arrow], [to_pandas][lancedb.query.AsyncQuery.to_pandas] and more. """ - return AsyncQuery(self._inner.query()) + return AsyncQuery(self._inner.query(), self) async def _to_lance(self, **kwargs) -> lance.LanceDataset: try: @@ -4349,7 +4374,13 @@ class AsyncTable: ------- pd.DataFrame """ - if blob_mode == "lazy": + _validate_blob_mode(blob_mode) + if blob_mode == "descriptions" or not _schema_has_blob_field( + await self.schema() + ): + return (await self.to_arrow()).to_pandas(**kwargs) + + if blob_mode == "lazy" and get_uri_scheme(await self.uri()) == "memory": return (await self.to_arrow()).to_pandas(**kwargs) return (await self._to_lance()).to_pandas(blob_mode=blob_mode, **kwargs) @@ -5393,7 +5424,7 @@ class AsyncTable: pa.RecordBatch A record batch containing the rows at the given offsets. """ - return AsyncTakeQuery(self._inner.take_offsets(offsets)) + return AsyncTakeQuery(self._inner.take_offsets(offsets), self) def take_row_ids(self, row_ids: list[int]) -> AsyncTakeQuery: """ @@ -5422,7 +5453,7 @@ class AsyncTable: AsyncTakeQuery A query object that can be executed to get the rows. """ - return AsyncTakeQuery(self._inner.take_row_ids(row_ids)) + return AsyncTakeQuery(self._inner.take_row_ids(row_ids), self) @property def tags(self) -> AsyncTags: diff --git a/python/python/tests/test_namespace.py b/python/python/tests/test_namespace.py index bbf7e4c6f..f9c923ad2 100644 --- a/python/python/tests/test_namespace.py +++ b/python/python/tests/test_namespace.py @@ -76,6 +76,35 @@ class TestNamespaceConnection: assert len(result) == 0 assert list(result.columns) == ["id", "vector", "text"] + def test_table_to_pandas_blob_lazy_through_namespace(self): + """Namespace-backed tables should use Lance blob-aware pandas conversion.""" + pytest.importorskip("lance") + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) + db.create_namespace(["test_ns"]) + data = pa.table( + { + "id": pa.array([1, 2], pa.int64()), + "blob": pa.array([b"hello", b"world"], pa.large_binary()), + }, + schema=pa.schema( + [ + pa.field("id", pa.int64()), + pa.field( + "blob", + pa.large_binary(), + metadata={"lance-encoding:blob": "true"}, + ), + ] + ), + ) + + table = db.create_table("blob_table", data, namespace_path=["test_ns"]) + df = table.to_pandas(blob_mode="lazy").sort_values("id") + + blob = df["blob"].iloc[0] + assert hasattr(blob, "readall") + assert blob.readall() == b"hello" + def test_open_table_through_namespace(self): """Test opening an existing table through namespace.""" db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) diff --git a/python/python/tests/test_query.py b/python/python/tests/test_query.py index 891ed808f..aa9468120 100644 --- a/python/python/tests/test_query.py +++ b/python/python/tests/test_query.py @@ -39,6 +39,35 @@ from utils import exception_output from importlib.util import find_spec +def _blob_query_data(): + return pa.table( + { + "id": pa.array([1, 2, 3, 4], pa.int64()), + "tag": pa.array(["drop", "keep", "keep", "keep"], pa.utf8()), + "vector": pa.array( + [[1.0, 0.0], [2.0, 0.0], [3.0, 0.0], [4.0, 0.0]], + type=pa.list_(pa.float32(), list_size=2), + ), + "blob": pa.array([b"one", b"two", b"three", b"four"], pa.large_binary()), + }, + schema=pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("tag", pa.utf8()), + pa.field("vector", pa.list_(pa.float32(), list_size=2)), + pa.field( + "blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"} + ), + ] + ), + ) + + +def _assert_lazy_blob(value, expected: bytes): + assert hasattr(value, "readall") + assert value.readall() == expected + + @pytest.fixture(scope="module") def table(tmpdir_factory) -> lancedb.table.Table: tmp_path = str(tmpdir_factory.mktemp("data")) @@ -181,6 +210,138 @@ async def test_query_to_pandas_kwargs(table, table_async): assert async_df["id"].tolist() == [1, 2] +@pytest.mark.parametrize("blob_mode", ["lazy", "bytes", "descriptions"]) +def test_plain_scan_query_to_pandas_blob_modes(tmp_db, blob_mode): + pytest.importorskip("lance") + table = tmp_db.create_table( + f"test_query_to_pandas_blob_{blob_mode}", _blob_query_data() + ) + + df = ( + table.search() + .select(["id", "blob"]) + .where("id = 1") + .to_pandas(blob_mode=blob_mode) + ) + + assert df["id"].tolist() == [1] + if blob_mode == "lazy": + _assert_lazy_blob(df["blob"].iloc[0], b"one") + elif blob_mode == "bytes": + assert df["blob"].tolist() == [b"one"] + else: + first = df["blob"].iloc[0] + assert first != b"one" + assert not hasattr(first, "readall") + + +def test_plain_scan_query_to_pandas_blob_projection(tmp_db): + pytest.importorskip("lance") + table = tmp_db.create_table( + "test_query_to_pandas_blob_projection", _blob_query_data() + ) + + df = ( + table.search() + .where("id >= 2") + .select({"id_alias": "id", "payload": "blob", "double_id": "id * 2"}) + .limit(2) + .offset(1) + .to_pandas(blob_mode="bytes") + ) + + assert df["id_alias"].tolist() == [3, 4] + assert df["payload"].tolist() == [b"three", b"four"] + assert df["double_id"].tolist() == [6, 8] + + +def test_plain_scan_query_to_pandas_blob_mode_does_not_collect_arrow( + tmp_db, monkeypatch +): + pytest.importorskip("lance") + table = tmp_db.create_table( + "test_query_to_pandas_blob_no_arrow_collect", _blob_query_data() + ) + query = table.search().where("id = 1").select(["id", "blob"]) + + def fail_to_arrow(*args, **kwargs): + raise AssertionError("to_arrow should not be called before native pandas") + + monkeypatch.setattr(query, "to_arrow", fail_to_arrow) + + df = query.to_pandas(blob_mode="bytes") + + assert df["id"].tolist() == [1] + assert df["blob"].tolist() == [b"one"] + + +@pytest.mark.asyncio +async def test_async_plain_scan_query_to_pandas_blob_projection(tmp_db_async): + pytest.importorskip("lance") + table = await tmp_db_async.create_table( + "test_async_query_to_pandas_blob_projection", _blob_query_data() + ) + + lazy_df = await ( + table.query().where("id = 1").select(["id", "blob"]).to_pandas(blob_mode="lazy") + ) + assert lazy_df["id"].tolist() == [1] + _assert_lazy_blob(lazy_df["blob"].iloc[0], b"one") + + bytes_df = await ( + table.query() + .where("id >= 2") + .select({"id_alias": "id", "payload": "blob", "double_id": "id * 2"}) + .limit(2) + .offset(1) + .to_pandas(blob_mode="bytes") + ) + assert bytes_df["id_alias"].tolist() == [3, 4] + assert bytes_df["payload"].tolist() == [b"three", b"four"] + assert bytes_df["double_id"].tolist() == [6, 8] + + desc_df = await ( + table.query() + .where("id = 1") + .select(["blob"]) + .to_pandas(blob_mode="descriptions") + ) + first = desc_df["blob"].iloc[0] + assert first != b"one" + assert not hasattr(first, "readall") + + +@pytest.mark.asyncio +async def test_async_plain_scan_query_to_pandas_blob_mode_does_not_collect_arrow( + tmp_db_async, monkeypatch +): + pytest.importorskip("lance") + table = await tmp_db_async.create_table( + "test_async_query_to_pandas_blob_no_arrow_collect", _blob_query_data() + ) + query = table.query().where("id = 1").select(["id", "blob"]) + + async def fail_to_arrow(*args, **kwargs): + raise AssertionError("to_arrow should not be called before native pandas") + + monkeypatch.setattr(query, "to_arrow", fail_to_arrow) + + df = await query.to_pandas(blob_mode="bytes") + + assert df["id"].tolist() == [1] + assert df["blob"].tolist() == [b"one"] + + +def test_vector_query_to_pandas_blob_mode_requires_native_path(tmp_db): + pytest.importorskip("lance") + table = tmp_db.create_table("test_vector_query_blob_mode", _blob_query_data()) + + with pytest.raises(RuntimeError, match="Lance native pandas conversion"): + table.search([1.0, 0.0]).select(["blob", "vector"]).limit(1).to_pandas( + blob_mode="lazy" + ) + + def test_order_by_plain_query(mem_db): table = mem_db.create_table( "test_order_by", diff --git a/python/python/tests/test_table.py b/python/python/tests/test_table.py index 964f6b904..5892dda42 100644 --- a/python/python/tests/test_table.py +++ b/python/python/tests/test_table.py @@ -26,6 +26,28 @@ from lancedb.table import LanceTable from pydantic import BaseModel +def _blob_test_data(): + return pa.table( + { + "id": pa.array([1, 2], pa.int64()), + "blob": pa.array([b"hello", b"world"], pa.large_binary()), + }, + schema=pa.schema( + [ + pa.field("id", pa.int64()), + pa.field( + "blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"} + ), + ] + ), + ) + + +def _assert_lazy_blob(value, expected: bytes): + assert hasattr(value, "readall") + assert value.readall() == expected + + def test_basic(mem_db: DBConnection): data = [ {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}, @@ -57,27 +79,30 @@ def test_table_to_pandas_default_matches_arrow(tmp_db: DBConnection): pd.testing.assert_frame_equal(table.to_pandas(), expected) -def test_table_to_pandas_blob_bytes(tmp_db: DBConnection): +def test_table_to_pandas_invalid_blob_mode_non_blob_table(tmp_db: DBConnection): + data = pa.table({"id": [1, 2], "text": ["one", "two"]}) + table = tmp_db.create_table("test_to_pandas_invalid_blob_mode", data=data) + + with pytest.raises(ValueError, match="blob_mode must be one of"): + table.to_pandas(blob_mode="invalid") + + +@pytest.mark.parametrize("blob_mode", ["lazy", "bytes", "descriptions"]) +def test_table_to_pandas_blob_modes(tmp_db: DBConnection, blob_mode): pytest.importorskip("lance") - data = pa.table( - { - "id": pa.array([1, 2], pa.int64()), - "blob": pa.array([b"hello", b"world"], pa.large_binary()), - }, - schema=pa.schema( - [ - pa.field("id", pa.int64()), - pa.field( - "blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"} - ), - ] - ), - ) - table = tmp_db.create_table("test_to_pandas_blob_bytes", data=data) + table = tmp_db.create_table(f"test_to_pandas_blob_{blob_mode}", _blob_test_data()) - df = table.to_pandas(blob_mode="bytes") + df = table.to_pandas(blob_mode=blob_mode) - assert df["blob"].tolist() == [b"hello", b"world"] + if blob_mode == "lazy": + _assert_lazy_blob(df["blob"].iloc[0], b"hello") + _assert_lazy_blob(df["blob"].iloc[1], b"world") + elif blob_mode == "bytes": + assert df["blob"].tolist() == [b"hello", b"world"] + else: + first = df["blob"].iloc[0] + assert first != b"hello" + assert not hasattr(first, "readall") def test_table_to_pandas_kwargs(tmp_db: DBConnection): @@ -93,22 +118,8 @@ def test_table_to_pandas_kwargs(tmp_db: DBConnection): @pytest.mark.asyncio async def test_async_table_to_pandas_blob_bytes(tmp_db_async: AsyncConnection): pytest.importorskip("lance") - data = pa.table( - { - "id": pa.array([1, 2], pa.int64()), - "blob": pa.array([b"hello", b"world"], pa.large_binary()), - }, - schema=pa.schema( - [ - pa.field("id", pa.int64()), - pa.field( - "blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"} - ), - ] - ), - ) table = await tmp_db_async.create_table( - "test_async_to_pandas_blob_bytes", data=data + "test_async_to_pandas_blob_bytes", data=_blob_test_data() ) df = await table.to_pandas(blob_mode="bytes") @@ -116,6 +127,19 @@ async def test_async_table_to_pandas_blob_bytes(tmp_db_async: AsyncConnection): assert df["blob"].tolist() == [b"hello", b"world"] +@pytest.mark.asyncio +async def test_async_table_to_pandas_invalid_blob_mode_non_blob_table( + tmp_db_async: AsyncConnection, +): + table = await tmp_db_async.create_table( + "test_async_to_pandas_invalid_blob_mode", + data=pa.table({"id": [1, 2], "text": ["one", "two"]}), + ) + + with pytest.raises(ValueError, match="blob_mode must be one of"): + await table.to_pandas(blob_mode="invalid") + + @pytest.mark.asyncio async def test_async_table_to_pandas_kwargs(tmp_db_async: AsyncConnection): pd = pytest.importorskip("pandas")