diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 0dc213857..cd52a7ec9 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -205,7 +205,7 @@ jobs: - name: Delete wheels run: rm -rf target/wheels pydantic1x: - timeout-minutes: 30 + timeout-minutes: 60 runs-on: "ubuntu-24.04" defaults: run: diff --git a/python/python/lancedb/query.py b/python/python/lancedb/query.py index f472f8bb7..04b13add1 100644 --- a/python/python/lancedb/query.py +++ b/python/python/lancedb/query.py @@ -718,6 +718,7 @@ class LanceQueryBuilder(ABC): flatten: Optional[Union[int, bool]] = None, *, timeout: Optional[timedelta] = None, + **kwargs, ) -> "pd.DataFrame": """ Execute the query and return the results as a pandas DataFrame. @@ -735,9 +736,12 @@ class LanceQueryBuilder(ABC): timeout: Optional[timedelta] The maximum time to wait for the query to complete. If None, wait indefinitely. + **kwargs + Forwarded to pyarrow.Table.to_pandas after query execution and + optional flattening. """ tbl = flatten_columns(self.to_arrow(timeout=timeout), flatten) - return tbl.to_pandas() + return tbl.to_pandas(**kwargs) @abstractmethod def to_arrow(self, *, timeout: Optional[timedelta] = None) -> pa.Table: @@ -2352,6 +2356,7 @@ class AsyncQueryBase(object): self, flatten: Optional[Union[int, bool]] = None, timeout: Optional[timedelta] = None, + **kwargs, ) -> "pd.DataFrame": """ Execute the query and collect the results into a pandas DataFrame. @@ -2384,10 +2389,13 @@ class AsyncQueryBase(object): The maximum time to wait for the query to complete. If not specified, no timeout is applied. If the query does not complete within the specified time, an error will be raised. + **kwargs + Forwarded to pyarrow.Table.to_pandas after query execution and + optional flattening. """ return ( flatten_columns(await self.to_arrow(timeout=timeout), flatten) - ).to_pandas() + ).to_pandas(**kwargs) async def to_polars( self, @@ -3389,6 +3397,7 @@ class BaseQueryBuilder(object): self, flatten: Optional[Union[int, bool]] = None, timeout: Optional[timedelta] = None, + **kwargs, ) -> "pd.DataFrame": """ Execute the query and collect the results into a pandas DataFrame. @@ -3421,8 +3430,11 @@ class BaseQueryBuilder(object): The maximum time to wait for the query to complete. If not specified, no timeout is applied. If the query does not complete within the specified time, an error will be raised. + **kwargs + Forwarded to pyarrow.Table.to_pandas after query execution and + optional flattening. """ - return LOOP.run(self._inner.to_pandas(flatten, timeout)) + return LOOP.run(self._inner.to_pandas(flatten, timeout, **kwargs)) def to_polars( self, diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index 6f81d1628..c2fdcfae9 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -40,7 +40,7 @@ from lancedb.embeddings import EmbeddingFunctionRegistry from lancedb.table import _normalize_progress from ..query import LanceVectorQueryBuilder, LanceQueryBuilder, LanceTakeQueryBuilder -from ..table import AsyncTable, IndexStatistics, Query, Table, Tags +from ..table import AsyncTable, BlobMode, IndexStatistics, Query, Table, Tags from ..types import BaseTokenizerType @@ -101,7 +101,7 @@ class RemoteTable(Table): """to_arrow() is not yet supported on LanceDB cloud.""" raise NotImplementedError("to_arrow() is not yet supported on LanceDB cloud.") - def to_pandas(self): + def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs): """to_pandas() is not yet supported on LanceDB cloud.""" raise NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.") diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 6c4b3eff9..8f39b5c71 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -87,6 +87,8 @@ from .util import ( ) from .index import lang_mapping +BlobMode = Literal["lazy", "bytes", "descriptions"] + _MODEL_BACKED_TOKENIZER_PREFIXES = ("jieba", "lindera") _MODEL_BACKED_TOKENIZER_ERRORS = ( "unknown base tokenizer", @@ -760,14 +762,22 @@ class Table(ABC): """ raise NotImplementedError - def to_pandas(self) -> "pandas.DataFrame": + def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pandas.DataFrame": """Return the table as a pandas DataFrame. + Parameters + ---------- + blob_mode: str, default "lazy" + Controls how blob columns are returned for backends that support + Lance blob-aware pandas conversion. + **kwargs + Forwarded to PyArrow / Lance pandas conversion. + Returns ------- pd.DataFrame """ - return self.to_arrow().to_pandas() + return self.to_arrow().to_pandas(**kwargs) @abstractmethod def to_arrow(self) -> pa.Table: @@ -2183,14 +2193,27 @@ class LanceTable(Table): """Return the first n rows of the table.""" return LOOP.run(self._table.head(n)) - def to_pandas(self) -> "pd.DataFrame": + def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pd.DataFrame": """Return the table as a pandas DataFrame. + Parameters + ---------- + blob_mode: str, default "lazy" + Controls how Lance blob columns are returned. + **kwargs + Forwarded to Lance pandas conversion. + Returns ------- pd.DataFrame """ - return self.to_arrow().to_pandas() + if blob_mode == "lazy" and ( + self._namespace_client is not None + or get_uri_scheme(self._dataset_path) == "memory" + ): + return self.to_arrow().to_pandas(**kwargs) + + return self.to_lance().to_pandas(blob_mode=blob_mode, **kwargs) def to_arrow(self) -> pa.Table: """Return the table as a pyarrow Table. @@ -3945,14 +3968,39 @@ class AsyncTable: """ return AsyncQuery(self._inner.query()) - async def to_pandas(self) -> "pd.DataFrame": + async def _to_lance(self, **kwargs) -> lance.LanceDataset: + try: + import lance + except ImportError: + raise ImportError( + "The lance library is required to use this function. " + "Please install with `pip install pylance`." + ) + + return lance.dataset( + await self.uri(), + version=await self.version(), + storage_options=await self.latest_storage_options(), + **kwargs, + ) + + async def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pd.DataFrame": """Return the table as a pandas DataFrame. + Parameters + ---------- + blob_mode: str, default "lazy" + Controls how Lance blob columns are returned. + **kwargs + Forwarded to PyArrow / Lance pandas conversion. + Returns ------- pd.DataFrame """ - return (await self.to_arrow()).to_pandas() + if blob_mode == "lazy": + return (await self.to_arrow()).to_pandas(**kwargs) + return (await self._to_lance()).to_pandas(blob_mode=blob_mode, **kwargs) async def to_arrow(self) -> pa.Table: """Return the table as a pyarrow Table. diff --git a/python/python/tests/test_query.py b/python/python/tests/test_query.py index e5ba9e5ae..febb7e784 100644 --- a/python/python/tests/test_query.py +++ b/python/python/tests/test_query.py @@ -165,6 +165,22 @@ def test_offset(table): assert len(results_with_offset.to_pandas()) == 1 +@pytest.mark.asyncio +async def test_query_to_pandas_kwargs(table, table_async): + sync_df = ( + LanceVectorQueryBuilder(table, [0, 0], "vector") + .select(["id"]) + .limit(1) + .to_pandas(split_blocks=True) + ) + assert sync_df["id"].tolist() == [1] + + async_df = await ( + table_async.query().select(["id"]).limit(2).to_pandas(split_blocks=True) + ) + assert async_df["id"].tolist() == [1, 2] + + def test_order_by_plain_query(mem_db): table = mem_db.create_table( "test_order_by", diff --git a/python/python/tests/test_remote_db.py b/python/python/tests/test_remote_db.py index bc69a0410..5639b8bba 100644 --- a/python/python/tests/test_remote_db.py +++ b/python/python/tests/test_remote_db.py @@ -269,6 +269,25 @@ def test_table_unimplemented_functions(): table.to_pandas() +def test_table_to_pandas_not_supported(): + def handler(request): + if request.path == "/v1/table/test/create/?mode=create": + request.send_response(200) + request.send_header("Content-Type", "application/json") + request.end_headers() + request.wfile.write(b"{}") + else: + request.send_response(404) + request.end_headers() + + with mock_lancedb_connection(handler) as db: + table = db.create_table("test", [{"id": 1}]) + with pytest.raises(NotImplementedError): + table.to_pandas() + with pytest.raises(NotImplementedError): + table.to_pandas(blob_mode="bytes", split_blocks=True) + + def test_table_add_in_threadpool(): def handler(request): if request.path == "/v1/table/test/insert/": diff --git a/python/python/tests/test_table.py b/python/python/tests/test_table.py index 3e27d0e69..fcb30f791 100644 --- a/python/python/tests/test_table.py +++ b/python/python/tests/test_table.py @@ -47,6 +47,85 @@ def test_basic(mem_db: DBConnection): assert table.to_arrow() == expected_data +def test_table_to_pandas_default_matches_arrow(tmp_db: DBConnection): + pd = pytest.importorskip("pandas") + data = pa.table({"id": [1, 2], "text": ["one", "two"]}) + table = tmp_db.create_table("test_to_pandas_old_call", data=data) + + expected = data.to_pandas() + pd.testing.assert_frame_equal(table.to_pandas(), expected) + + +def test_table_to_pandas_blob_bytes(tmp_db: DBConnection): + pytest.importorskip("lance") + data = pa.table( + { + "id": pa.array([1, 2], pa.int64()), + "blob": pa.array([b"hello", b"world"], pa.large_binary()), + }, + schema=pa.schema( + [ + pa.field("id", pa.int64()), + pa.field( + "blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"} + ), + ] + ), + ) + table = tmp_db.create_table("test_to_pandas_blob_bytes", data=data) + + df = table.to_pandas(blob_mode="bytes") + + assert df["blob"].tolist() == [b"hello", b"world"] + + +def test_table_to_pandas_kwargs(tmp_db: DBConnection): + pd = pytest.importorskip("pandas") + data = pa.table({"id": pa.array([1, 2], pa.int64())}) + table = tmp_db.create_table("test_to_pandas_kwargs", data=data) + + df = table.to_pandas(types_mapper=pd.ArrowDtype) + + assert str(df["id"].dtype) == "int64[pyarrow]" + + +@pytest.mark.asyncio +async def test_async_table_to_pandas_blob_bytes(tmp_db_async: AsyncConnection): + pytest.importorskip("lance") + data = pa.table( + { + "id": pa.array([1, 2], pa.int64()), + "blob": pa.array([b"hello", b"world"], pa.large_binary()), + }, + schema=pa.schema( + [ + pa.field("id", pa.int64()), + pa.field( + "blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"} + ), + ] + ), + ) + table = await tmp_db_async.create_table( + "test_async_to_pandas_blob_bytes", data=data + ) + + df = await table.to_pandas(blob_mode="bytes") + + assert df["blob"].tolist() == [b"hello", b"world"] + + +@pytest.mark.asyncio +async def test_async_table_to_pandas_kwargs(tmp_db_async: AsyncConnection): + pd = pytest.importorskip("pandas") + data = pa.table({"id": pa.array([1, 2], pa.int64())}) + table = await tmp_db_async.create_table("test_async_to_pandas_kwargs", data=data) + + df = await table.to_pandas(types_mapper=pd.ArrowDtype) + + assert str(df["id"].dtype) == "int64[pyarrow]" + + def test_create_table_infers_large_int_vectors(mem_db: DBConnection): data = [{"vector": [0, 300]}]