mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-21 05:50:41 +00:00
feat(python): align to_pandas pandas kwargs (#3397)
## Feature This PR aligns LanceDB Python `to_pandas()` APIs with Lance pandas conversion capabilities while keeping LanceDB query-specific semantics intact. - Adds `blob_mode` and pandas `**kwargs` support to local table `to_pandas()`. - Delegates local `LanceTable.to_pandas()` to Lance dataset `to_pandas(blob_mode=..., **kwargs)`. - Keeps remote table `to_pandas()` unsupported with `NotImplementedError`. - Allows sync and async query `to_pandas()` to forward pandas kwargs after LanceDB `flatten` and `timeout` handling. Why we need this feature: Users can access Lance blob-aware pandas conversion from LanceDB local tables and can pass PyArrow pandas conversion options through table/query APIs without losing existing `flatten` or `timeout` behavior. How it works: The table API exposes a `BlobMode` literal type for `lazy`, `bytes`, and `descriptions`. Local tables call through to the backing Lance dataset. Query APIs do not add `blob_mode`; they materialize Arrow results, apply LanceDB flattening when requested, and then call `to_pandas(**kwargs)`. ## Validation - `uv run --frozen --extra tests pytest python/tests/test_table.py::test_table_to_pandas_default_matches_arrow python/tests/test_table.py::test_table_to_pandas_blob_bytes python/tests/test_table.py::test_table_to_pandas_kwargs python/tests/test_query.py::test_query_to_pandas_kwargs python/tests/test_query.py::test_query_timeout python/tests/test_remote_db.py::test_table_to_pandas_not_supported` - `uv run --frozen --extra dev ruff check python/lancedb/table.py python/lancedb/query.py python/lancedb/remote/table.py python/tests/test_table.py python/tests/test_query.py python/tests/test_remote_db.py` - `uv run --frozen --extra tests pytest python/tests/test_table.py python/tests/test_query.py python/tests/test_remote_db.py` Note: `python/uv.lock` was intentionally not committed in this branch.
This commit is contained in:
2
.github/workflows/python.yml
vendored
2
.github/workflows/python.yml
vendored
@@ -205,7 +205,7 @@ jobs:
|
||||
- name: Delete wheels
|
||||
run: rm -rf target/wheels
|
||||
pydantic1x:
|
||||
timeout-minutes: 30
|
||||
timeout-minutes: 60
|
||||
runs-on: "ubuntu-24.04"
|
||||
defaults:
|
||||
run:
|
||||
|
||||
@@ -718,6 +718,7 @@ class LanceQueryBuilder(ABC):
|
||||
flatten: Optional[Union[int, bool]] = None,
|
||||
*,
|
||||
timeout: Optional[timedelta] = None,
|
||||
**kwargs,
|
||||
) -> "pd.DataFrame":
|
||||
"""
|
||||
Execute the query and return the results as a pandas DataFrame.
|
||||
@@ -735,9 +736,12 @@ class LanceQueryBuilder(ABC):
|
||||
timeout: Optional[timedelta]
|
||||
The maximum time to wait for the query to complete.
|
||||
If None, wait indefinitely.
|
||||
**kwargs
|
||||
Forwarded to pyarrow.Table.to_pandas after query execution and
|
||||
optional flattening.
|
||||
"""
|
||||
tbl = flatten_columns(self.to_arrow(timeout=timeout), flatten)
|
||||
return tbl.to_pandas()
|
||||
return tbl.to_pandas(**kwargs)
|
||||
|
||||
@abstractmethod
|
||||
def to_arrow(self, *, timeout: Optional[timedelta] = None) -> pa.Table:
|
||||
@@ -2352,6 +2356,7 @@ class AsyncQueryBase(object):
|
||||
self,
|
||||
flatten: Optional[Union[int, bool]] = None,
|
||||
timeout: Optional[timedelta] = None,
|
||||
**kwargs,
|
||||
) -> "pd.DataFrame":
|
||||
"""
|
||||
Execute the query and collect the results into a pandas DataFrame.
|
||||
@@ -2384,10 +2389,13 @@ class AsyncQueryBase(object):
|
||||
The maximum time to wait for the query to complete.
|
||||
If not specified, no timeout is applied. If the query does not
|
||||
complete within the specified time, an error will be raised.
|
||||
**kwargs
|
||||
Forwarded to pyarrow.Table.to_pandas after query execution and
|
||||
optional flattening.
|
||||
"""
|
||||
return (
|
||||
flatten_columns(await self.to_arrow(timeout=timeout), flatten)
|
||||
).to_pandas()
|
||||
).to_pandas(**kwargs)
|
||||
|
||||
async def to_polars(
|
||||
self,
|
||||
@@ -3389,6 +3397,7 @@ class BaseQueryBuilder(object):
|
||||
self,
|
||||
flatten: Optional[Union[int, bool]] = None,
|
||||
timeout: Optional[timedelta] = None,
|
||||
**kwargs,
|
||||
) -> "pd.DataFrame":
|
||||
"""
|
||||
Execute the query and collect the results into a pandas DataFrame.
|
||||
@@ -3421,8 +3430,11 @@ class BaseQueryBuilder(object):
|
||||
The maximum time to wait for the query to complete.
|
||||
If not specified, no timeout is applied. If the query does not
|
||||
complete within the specified time, an error will be raised.
|
||||
**kwargs
|
||||
Forwarded to pyarrow.Table.to_pandas after query execution and
|
||||
optional flattening.
|
||||
"""
|
||||
return LOOP.run(self._inner.to_pandas(flatten, timeout))
|
||||
return LOOP.run(self._inner.to_pandas(flatten, timeout, **kwargs))
|
||||
|
||||
def to_polars(
|
||||
self,
|
||||
|
||||
@@ -40,7 +40,7 @@ from lancedb.embeddings import EmbeddingFunctionRegistry
|
||||
from lancedb.table import _normalize_progress
|
||||
|
||||
from ..query import LanceVectorQueryBuilder, LanceQueryBuilder, LanceTakeQueryBuilder
|
||||
from ..table import AsyncTable, IndexStatistics, Query, Table, Tags
|
||||
from ..table import AsyncTable, BlobMode, IndexStatistics, Query, Table, Tags
|
||||
from ..types import BaseTokenizerType
|
||||
|
||||
|
||||
@@ -101,7 +101,7 @@ class RemoteTable(Table):
|
||||
"""to_arrow() is not yet supported on LanceDB cloud."""
|
||||
raise NotImplementedError("to_arrow() is not yet supported on LanceDB cloud.")
|
||||
|
||||
def to_pandas(self):
|
||||
def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs):
|
||||
"""to_pandas() is not yet supported on LanceDB cloud."""
|
||||
raise NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.")
|
||||
|
||||
|
||||
@@ -87,6 +87,8 @@ from .util import (
|
||||
)
|
||||
from .index import lang_mapping
|
||||
|
||||
BlobMode = Literal["lazy", "bytes", "descriptions"]
|
||||
|
||||
_MODEL_BACKED_TOKENIZER_PREFIXES = ("jieba", "lindera")
|
||||
_MODEL_BACKED_TOKENIZER_ERRORS = (
|
||||
"unknown base tokenizer",
|
||||
@@ -760,14 +762,22 @@ class Table(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def to_pandas(self) -> "pandas.DataFrame":
|
||||
def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pandas.DataFrame":
|
||||
"""Return the table as a pandas DataFrame.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
blob_mode: str, default "lazy"
|
||||
Controls how blob columns are returned for backends that support
|
||||
Lance blob-aware pandas conversion.
|
||||
**kwargs
|
||||
Forwarded to PyArrow / Lance pandas conversion.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
"""
|
||||
return self.to_arrow().to_pandas()
|
||||
return self.to_arrow().to_pandas(**kwargs)
|
||||
|
||||
@abstractmethod
|
||||
def to_arrow(self) -> pa.Table:
|
||||
@@ -2183,14 +2193,27 @@ class LanceTable(Table):
|
||||
"""Return the first n rows of the table."""
|
||||
return LOOP.run(self._table.head(n))
|
||||
|
||||
def to_pandas(self) -> "pd.DataFrame":
|
||||
def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pd.DataFrame":
|
||||
"""Return the table as a pandas DataFrame.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
blob_mode: str, default "lazy"
|
||||
Controls how Lance blob columns are returned.
|
||||
**kwargs
|
||||
Forwarded to Lance pandas conversion.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
"""
|
||||
return self.to_arrow().to_pandas()
|
||||
if blob_mode == "lazy" and (
|
||||
self._namespace_client is not None
|
||||
or get_uri_scheme(self._dataset_path) == "memory"
|
||||
):
|
||||
return self.to_arrow().to_pandas(**kwargs)
|
||||
|
||||
return self.to_lance().to_pandas(blob_mode=blob_mode, **kwargs)
|
||||
|
||||
def to_arrow(self) -> pa.Table:
|
||||
"""Return the table as a pyarrow Table.
|
||||
@@ -3945,14 +3968,39 @@ class AsyncTable:
|
||||
"""
|
||||
return AsyncQuery(self._inner.query())
|
||||
|
||||
async def to_pandas(self) -> "pd.DataFrame":
|
||||
async def _to_lance(self, **kwargs) -> lance.LanceDataset:
|
||||
try:
|
||||
import lance
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"The lance library is required to use this function. "
|
||||
"Please install with `pip install pylance`."
|
||||
)
|
||||
|
||||
return lance.dataset(
|
||||
await self.uri(),
|
||||
version=await self.version(),
|
||||
storage_options=await self.latest_storage_options(),
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
async def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pd.DataFrame":
|
||||
"""Return the table as a pandas DataFrame.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
blob_mode: str, default "lazy"
|
||||
Controls how Lance blob columns are returned.
|
||||
**kwargs
|
||||
Forwarded to PyArrow / Lance pandas conversion.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
"""
|
||||
return (await self.to_arrow()).to_pandas()
|
||||
if blob_mode == "lazy":
|
||||
return (await self.to_arrow()).to_pandas(**kwargs)
|
||||
return (await self._to_lance()).to_pandas(blob_mode=blob_mode, **kwargs)
|
||||
|
||||
async def to_arrow(self) -> pa.Table:
|
||||
"""Return the table as a pyarrow Table.
|
||||
|
||||
@@ -165,6 +165,22 @@ def test_offset(table):
|
||||
assert len(results_with_offset.to_pandas()) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_query_to_pandas_kwargs(table, table_async):
|
||||
sync_df = (
|
||||
LanceVectorQueryBuilder(table, [0, 0], "vector")
|
||||
.select(["id"])
|
||||
.limit(1)
|
||||
.to_pandas(split_blocks=True)
|
||||
)
|
||||
assert sync_df["id"].tolist() == [1]
|
||||
|
||||
async_df = await (
|
||||
table_async.query().select(["id"]).limit(2).to_pandas(split_blocks=True)
|
||||
)
|
||||
assert async_df["id"].tolist() == [1, 2]
|
||||
|
||||
|
||||
def test_order_by_plain_query(mem_db):
|
||||
table = mem_db.create_table(
|
||||
"test_order_by",
|
||||
|
||||
@@ -269,6 +269,25 @@ def test_table_unimplemented_functions():
|
||||
table.to_pandas()
|
||||
|
||||
|
||||
def test_table_to_pandas_not_supported():
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/create/?mode=create":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"{}")
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
table = db.create_table("test", [{"id": 1}])
|
||||
with pytest.raises(NotImplementedError):
|
||||
table.to_pandas()
|
||||
with pytest.raises(NotImplementedError):
|
||||
table.to_pandas(blob_mode="bytes", split_blocks=True)
|
||||
|
||||
|
||||
def test_table_add_in_threadpool():
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/insert/":
|
||||
|
||||
@@ -47,6 +47,85 @@ def test_basic(mem_db: DBConnection):
|
||||
assert table.to_arrow() == expected_data
|
||||
|
||||
|
||||
def test_table_to_pandas_default_matches_arrow(tmp_db: DBConnection):
|
||||
pd = pytest.importorskip("pandas")
|
||||
data = pa.table({"id": [1, 2], "text": ["one", "two"]})
|
||||
table = tmp_db.create_table("test_to_pandas_old_call", data=data)
|
||||
|
||||
expected = data.to_pandas()
|
||||
pd.testing.assert_frame_equal(table.to_pandas(), expected)
|
||||
|
||||
|
||||
def test_table_to_pandas_blob_bytes(tmp_db: DBConnection):
|
||||
pytest.importorskip("lance")
|
||||
data = pa.table(
|
||||
{
|
||||
"id": pa.array([1, 2], pa.int64()),
|
||||
"blob": pa.array([b"hello", b"world"], pa.large_binary()),
|
||||
},
|
||||
schema=pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field(
|
||||
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
table = tmp_db.create_table("test_to_pandas_blob_bytes", data=data)
|
||||
|
||||
df = table.to_pandas(blob_mode="bytes")
|
||||
|
||||
assert df["blob"].tolist() == [b"hello", b"world"]
|
||||
|
||||
|
||||
def test_table_to_pandas_kwargs(tmp_db: DBConnection):
|
||||
pd = pytest.importorskip("pandas")
|
||||
data = pa.table({"id": pa.array([1, 2], pa.int64())})
|
||||
table = tmp_db.create_table("test_to_pandas_kwargs", data=data)
|
||||
|
||||
df = table.to_pandas(types_mapper=pd.ArrowDtype)
|
||||
|
||||
assert str(df["id"].dtype) == "int64[pyarrow]"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_table_to_pandas_blob_bytes(tmp_db_async: AsyncConnection):
|
||||
pytest.importorskip("lance")
|
||||
data = pa.table(
|
||||
{
|
||||
"id": pa.array([1, 2], pa.int64()),
|
||||
"blob": pa.array([b"hello", b"world"], pa.large_binary()),
|
||||
},
|
||||
schema=pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field(
|
||||
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
table = await tmp_db_async.create_table(
|
||||
"test_async_to_pandas_blob_bytes", data=data
|
||||
)
|
||||
|
||||
df = await table.to_pandas(blob_mode="bytes")
|
||||
|
||||
assert df["blob"].tolist() == [b"hello", b"world"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_table_to_pandas_kwargs(tmp_db_async: AsyncConnection):
|
||||
pd = pytest.importorskip("pandas")
|
||||
data = pa.table({"id": pa.array([1, 2], pa.int64())})
|
||||
table = await tmp_db_async.create_table("test_async_to_pandas_kwargs", data=data)
|
||||
|
||||
df = await table.to_pandas(types_mapper=pd.ArrowDtype)
|
||||
|
||||
assert str(df["id"].dtype) == "int64[pyarrow]"
|
||||
|
||||
|
||||
def test_create_table_infers_large_int_vectors(mem_db: DBConnection):
|
||||
data = [{"vector": [0, 300]}]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user