mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-02 20:00:46 +00:00
feat(python): support blob modes in query to_pandas
This commit is contained in:
@@ -41,6 +41,14 @@ from .rerankers.rrf import RRFReranker
|
||||
from .rerankers.util import check_reranker_result
|
||||
from .util import flatten_columns
|
||||
|
||||
BlobMode = Literal["lazy", "bytes", "descriptions"]
|
||||
|
||||
_BLOB_MODE_TO_HANDLING = {
|
||||
"lazy": "blobs_descriptions",
|
||||
"bytes": "all_binary",
|
||||
"descriptions": "blobs_descriptions",
|
||||
}
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import sys
|
||||
|
||||
@@ -55,7 +63,7 @@ if TYPE_CHECKING:
|
||||
from ._lancedb import VectorQuery as LanceVectorQuery
|
||||
from .common import VEC
|
||||
from .pydantic import LanceModel
|
||||
from .table import Table
|
||||
from .table import AsyncTable, Table
|
||||
|
||||
if sys.version_info >= (3, 11):
|
||||
from typing import Self
|
||||
@@ -65,6 +73,147 @@ if TYPE_CHECKING:
|
||||
T = TypeVar("T", bound="LanceModel")
|
||||
|
||||
|
||||
def _validate_blob_mode(blob_mode: BlobMode) -> None:
|
||||
if blob_mode not in _BLOB_MODE_TO_HANDLING:
|
||||
modes = ", ".join(repr(mode) for mode in _BLOB_MODE_TO_HANDLING)
|
||||
raise ValueError(f"blob_mode must be one of {modes}, got {blob_mode!r}")
|
||||
|
||||
|
||||
def _field_is_blob(field: pa.Field) -> bool:
|
||||
metadata = field.metadata or {}
|
||||
return metadata.get(b"lance-encoding:blob") == b"true" or (
|
||||
metadata.get("lance-encoding:blob") == "true"
|
||||
)
|
||||
|
||||
|
||||
def _schema_has_blob_field(schema: pa.Schema) -> bool:
|
||||
return any(_field_is_blob(field) for field in schema)
|
||||
|
||||
|
||||
def _blob_mode_requires_native_pandas(blob_mode: BlobMode, schema: pa.Schema) -> bool:
|
||||
return blob_mode in ("lazy", "bytes") and _schema_has_blob_field(schema)
|
||||
|
||||
|
||||
def _unsupported_blob_pandas_error(reason: str) -> RuntimeError:
|
||||
return RuntimeError(
|
||||
"blob_mode='lazy' and blob_mode='bytes' require Lance native pandas "
|
||||
f"conversion for queries that return blob columns, but {reason}. "
|
||||
"Use blob_mode='descriptions' or remove blob columns from the projection."
|
||||
)
|
||||
|
||||
|
||||
def _query_is_plain_scan(query: Query) -> bool:
|
||||
return (
|
||||
query.vector is None
|
||||
and query.full_text_query is None
|
||||
and not query.postfilter
|
||||
and not query.order_by
|
||||
)
|
||||
|
||||
|
||||
def _filter_to_sql(filter: Optional[Union[str, Expr]]) -> Optional[str]:
|
||||
if filter is None:
|
||||
return None
|
||||
if isinstance(filter, Expr):
|
||||
return filter.to_sql()
|
||||
return filter
|
||||
|
||||
|
||||
def _projection_to_scanner_kwargs(
|
||||
columns: Optional[
|
||||
Union[
|
||||
List[str], List[Tuple[str, Union[str, Expr]]], Dict[str, Union[str, Expr]]
|
||||
]
|
||||
],
|
||||
) -> Dict[str, Any]:
|
||||
if columns is None:
|
||||
return {}
|
||||
if isinstance(columns, list):
|
||||
if all(isinstance(column, str) for column in columns):
|
||||
return {"columns": columns}
|
||||
if all(isinstance(column, tuple) and len(column) == 2 for column in columns):
|
||||
return {
|
||||
"columns": {
|
||||
name: expr.to_sql() if isinstance(expr, Expr) else expr
|
||||
for name, expr in columns
|
||||
}
|
||||
}
|
||||
# Let Lance raise the detailed projection validation error.
|
||||
return {"columns": columns}
|
||||
|
||||
projection = {}
|
||||
for name, expr in columns.items():
|
||||
if isinstance(expr, Expr):
|
||||
expr = expr.to_sql()
|
||||
projection[name] = expr
|
||||
return {"columns": projection}
|
||||
|
||||
|
||||
def _scanner_kwargs_for_query(query: Query, blob_mode: BlobMode) -> Dict[str, Any]:
|
||||
kwargs = {
|
||||
**_projection_to_scanner_kwargs(query.columns),
|
||||
"filter": _filter_to_sql(query.filter),
|
||||
"limit": query.limit,
|
||||
"offset": query.offset,
|
||||
"with_row_id": query.with_row_id,
|
||||
"fast_search": query.fast_search,
|
||||
"blob_handling": _BLOB_MODE_TO_HANDLING[blob_mode],
|
||||
}
|
||||
return {key: value for key, value in kwargs.items() if value is not None}
|
||||
|
||||
|
||||
def _ensure_lazy_blob_frame(
|
||||
df: "pd.DataFrame", schema: pa.Schema, blob_mode: BlobMode
|
||||
) -> "pd.DataFrame":
|
||||
if blob_mode != "lazy" or not _schema_has_blob_field(schema) or len(df) == 0:
|
||||
return df
|
||||
|
||||
for field in schema:
|
||||
if not _field_is_blob(field) or field.name not in df.columns:
|
||||
continue
|
||||
value = df[field.name].iloc[0]
|
||||
if value is not None and not hasattr(value, "readall"):
|
||||
raise _unsupported_blob_pandas_error(
|
||||
"the Lance scanner did not return lazy blob files"
|
||||
)
|
||||
return df
|
||||
|
||||
|
||||
def _scanner_to_pandas(scanner: Any, blob_mode: BlobMode, **kwargs) -> "pd.DataFrame":
|
||||
schema = getattr(scanner, "projected_schema", None)
|
||||
if schema is None:
|
||||
schema = getattr(scanner, "schema", None)
|
||||
if schema is None:
|
||||
schema = getattr(scanner, "dataset_schema", None)
|
||||
if callable(schema):
|
||||
schema = schema()
|
||||
if hasattr(scanner, "to_pandas"):
|
||||
try:
|
||||
df = scanner.to_pandas(blob_mode=blob_mode, **kwargs)
|
||||
except TypeError as err:
|
||||
message = str(err)
|
||||
if "blob_mode" not in message and "unexpected keyword" not in message:
|
||||
raise
|
||||
df = scanner.to_pandas(**kwargs)
|
||||
if schema is not None:
|
||||
return _ensure_lazy_blob_frame(df, schema, blob_mode)
|
||||
return df
|
||||
|
||||
if hasattr(scanner, "to_pyarrow"):
|
||||
reader = scanner.to_pyarrow()
|
||||
tbl = reader.read_all()
|
||||
elif hasattr(scanner, "to_table"):
|
||||
tbl = scanner.to_table()
|
||||
else:
|
||||
reader = scanner.to_reader()
|
||||
tbl = reader.read_all()
|
||||
if blob_mode == "lazy" and _schema_has_blob_field(tbl.schema):
|
||||
raise _unsupported_blob_pandas_error(
|
||||
"the Lance scanner does not expose to_pandas"
|
||||
)
|
||||
return tbl.to_pandas(**kwargs)
|
||||
|
||||
|
||||
# Pydantic validation function for vector queries
|
||||
def ensure_vector_query(
|
||||
val: Any,
|
||||
@@ -718,6 +867,7 @@ class LanceQueryBuilder(ABC):
|
||||
self,
|
||||
flatten: Optional[Union[int, bool]] = None,
|
||||
*,
|
||||
blob_mode: BlobMode = "lazy",
|
||||
timeout: Optional[timedelta] = None,
|
||||
**kwargs,
|
||||
) -> "pd.DataFrame":
|
||||
@@ -737,11 +887,31 @@ class LanceQueryBuilder(ABC):
|
||||
timeout: Optional[timedelta]
|
||||
The maximum time to wait for the query to complete.
|
||||
If None, wait indefinitely.
|
||||
blob_mode: str, default "lazy"
|
||||
Controls how blob columns are returned for plain scan queries.
|
||||
Vector, FTS, hybrid, and other non-native query shapes keep the
|
||||
existing Arrow conversion path and only support blob descriptions.
|
||||
**kwargs
|
||||
Forwarded to pyarrow.Table.to_pandas after query execution and
|
||||
optional flattening.
|
||||
"""
|
||||
_validate_blob_mode(blob_mode)
|
||||
native_error = None
|
||||
tbl = flatten_columns(self.to_arrow(timeout=timeout), flatten)
|
||||
if _blob_mode_requires_native_pandas(blob_mode, tbl.schema):
|
||||
if flatten is None and timeout is None:
|
||||
try:
|
||||
df = self._plain_scan_to_pandas(blob_mode, **kwargs)
|
||||
if df is not None:
|
||||
return df
|
||||
except Exception as err:
|
||||
native_error = err
|
||||
reason = (
|
||||
"this query shape cannot use Lance native pandas conversion"
|
||||
if native_error is None
|
||||
else str(native_error)
|
||||
)
|
||||
raise _unsupported_blob_pandas_error(reason) from native_error
|
||||
return tbl.to_pandas(**kwargs)
|
||||
|
||||
@abstractmethod
|
||||
@@ -1086,6 +1256,19 @@ class LanceQueryBuilder(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def _plain_scan_to_pandas(
|
||||
self,
|
||||
blob_mode: BlobMode,
|
||||
**kwargs,
|
||||
) -> Optional["pd.DataFrame"]:
|
||||
query = self.to_query_object()
|
||||
if not _query_is_plain_scan(query):
|
||||
return None
|
||||
|
||||
dataset = self._table.to_lance()
|
||||
scanner = dataset.scanner(**_scanner_kwargs_for_query(query, blob_mode))
|
||||
return _scanner_to_pandas(scanner, blob_mode, **kwargs)
|
||||
|
||||
@abstractmethod
|
||||
def to_query_object(self) -> Query:
|
||||
"""Return a serializable representation of the query
|
||||
@@ -2207,7 +2390,11 @@ class AsyncQueryBase(object):
|
||||
Base class for all async queries (take, scan, vector, fts, hybrid)
|
||||
"""
|
||||
|
||||
def __init__(self, inner: Union[LanceQuery, LanceVectorQuery, LanceTakeQuery]):
|
||||
def __init__(
|
||||
self,
|
||||
inner: Union[LanceQuery, LanceVectorQuery, LanceTakeQuery],
|
||||
table: Optional["AsyncTable"] = None,
|
||||
):
|
||||
"""
|
||||
Construct an AsyncQueryBase
|
||||
|
||||
@@ -2215,6 +2402,7 @@ class AsyncQueryBase(object):
|
||||
[AsyncTable.query][lancedb.table.AsyncTable.query] method to create a query.
|
||||
"""
|
||||
self._inner = inner
|
||||
self._table = table
|
||||
|
||||
def to_query_object(self) -> Query:
|
||||
"""
|
||||
@@ -2357,6 +2545,8 @@ class AsyncQueryBase(object):
|
||||
self,
|
||||
flatten: Optional[Union[int, bool]] = None,
|
||||
timeout: Optional[timedelta] = None,
|
||||
*,
|
||||
blob_mode: BlobMode = "lazy",
|
||||
**kwargs,
|
||||
) -> "pd.DataFrame":
|
||||
"""
|
||||
@@ -2390,13 +2580,48 @@ class AsyncQueryBase(object):
|
||||
The maximum time to wait for the query to complete.
|
||||
If not specified, no timeout is applied. If the query does not
|
||||
complete within the specified time, an error will be raised.
|
||||
blob_mode: str, default "lazy"
|
||||
Controls how blob columns are returned for plain scan queries.
|
||||
Vector, FTS, hybrid, and other non-native query shapes keep the
|
||||
existing Arrow conversion path and only support blob descriptions.
|
||||
**kwargs
|
||||
Forwarded to pyarrow.Table.to_pandas after query execution and
|
||||
optional flattening.
|
||||
"""
|
||||
return (
|
||||
flatten_columns(await self.to_arrow(timeout=timeout), flatten)
|
||||
).to_pandas(**kwargs)
|
||||
_validate_blob_mode(blob_mode)
|
||||
native_error = None
|
||||
tbl = flatten_columns(await self.to_arrow(timeout=timeout), flatten)
|
||||
if _blob_mode_requires_native_pandas(blob_mode, tbl.schema):
|
||||
if flatten is None and timeout is None:
|
||||
try:
|
||||
df = await self._plain_scan_to_pandas(blob_mode, **kwargs)
|
||||
if df is not None:
|
||||
return df
|
||||
except Exception as err:
|
||||
native_error = err
|
||||
reason = (
|
||||
"this query shape cannot use Lance native pandas conversion"
|
||||
if native_error is None
|
||||
else str(native_error)
|
||||
)
|
||||
raise _unsupported_blob_pandas_error(reason) from native_error
|
||||
return tbl.to_pandas(**kwargs)
|
||||
|
||||
async def _plain_scan_to_pandas(
|
||||
self,
|
||||
blob_mode: BlobMode,
|
||||
**kwargs,
|
||||
) -> Optional["pd.DataFrame"]:
|
||||
if self._table is None:
|
||||
return None
|
||||
|
||||
query = self.to_query_object()
|
||||
if not _query_is_plain_scan(query):
|
||||
return None
|
||||
|
||||
dataset = await self._table._to_lance()
|
||||
scanner = dataset.scanner(**_scanner_kwargs_for_query(query, blob_mode))
|
||||
return _scanner_to_pandas(scanner, blob_mode, **kwargs)
|
||||
|
||||
async def to_polars(
|
||||
self,
|
||||
@@ -2503,14 +2728,18 @@ class AsyncStandardQuery(AsyncQueryBase):
|
||||
Base class for "standard" async queries (all but take currently)
|
||||
"""
|
||||
|
||||
def __init__(self, inner: Union[LanceQuery, LanceVectorQuery]):
|
||||
def __init__(
|
||||
self,
|
||||
inner: Union[LanceQuery, LanceVectorQuery],
|
||||
table: Optional["AsyncTable"] = None,
|
||||
):
|
||||
"""
|
||||
Construct an AsyncStandardQuery
|
||||
|
||||
This method is not intended to be called directly. Instead, use the
|
||||
[AsyncTable.query][lancedb.table.AsyncTable.query] method to create a query.
|
||||
"""
|
||||
super().__init__(inner)
|
||||
super().__init__(inner, table)
|
||||
|
||||
def where(self, predicate: Union[str, Expr]) -> Self:
|
||||
"""
|
||||
@@ -2616,14 +2845,14 @@ class AsyncStandardQuery(AsyncQueryBase):
|
||||
|
||||
|
||||
class AsyncQuery(AsyncStandardQuery):
|
||||
def __init__(self, inner: LanceQuery):
|
||||
def __init__(self, inner: LanceQuery, table: Optional["AsyncTable"] = None):
|
||||
"""
|
||||
Construct an AsyncQuery
|
||||
|
||||
This method is not intended to be called directly. Instead, use the
|
||||
[AsyncTable.query][lancedb.table.AsyncTable.query] method to create a query.
|
||||
"""
|
||||
super().__init__(inner)
|
||||
super().__init__(inner, table)
|
||||
self._inner = inner
|
||||
|
||||
@classmethod
|
||||
@@ -2707,10 +2936,11 @@ class AsyncQuery(AsyncStandardQuery):
|
||||
new_self = self._inner.nearest_to(query_vectors[0])
|
||||
for v in query_vectors[1:]:
|
||||
new_self.add_query_vector(v)
|
||||
return AsyncVectorQuery(new_self)
|
||||
return AsyncVectorQuery(new_self, self._table)
|
||||
else:
|
||||
return AsyncVectorQuery(
|
||||
self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector))
|
||||
self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector)),
|
||||
self._table,
|
||||
)
|
||||
|
||||
def nearest_to_text(
|
||||
@@ -2743,17 +2973,18 @@ class AsyncQuery(AsyncStandardQuery):
|
||||
|
||||
if isinstance(query, str):
|
||||
return AsyncFTSQuery(
|
||||
self._inner.nearest_to_text({"query": query, "columns": columns})
|
||||
self._inner.nearest_to_text({"query": query, "columns": columns}),
|
||||
self._table,
|
||||
)
|
||||
# FullTextQuery object
|
||||
return AsyncFTSQuery(self._inner.nearest_to_text({"query": query}))
|
||||
return AsyncFTSQuery(self._inner.nearest_to_text({"query": query}), self._table)
|
||||
|
||||
|
||||
class AsyncFTSQuery(AsyncStandardQuery):
|
||||
"""A query for full text search for LanceDB."""
|
||||
|
||||
def __init__(self, inner: LanceFTSQuery):
|
||||
super().__init__(inner)
|
||||
def __init__(self, inner: LanceFTSQuery, table: Optional["AsyncTable"] = None):
|
||||
super().__init__(inner, table)
|
||||
self._inner = inner
|
||||
self._reranker = None
|
||||
|
||||
@@ -2835,10 +3066,11 @@ class AsyncFTSQuery(AsyncStandardQuery):
|
||||
new_self = self._inner.nearest_to(query_vectors[0])
|
||||
for v in query_vectors[1:]:
|
||||
new_self.add_query_vector(v)
|
||||
return AsyncHybridQuery(new_self)
|
||||
return AsyncHybridQuery(new_self, self._table)
|
||||
else:
|
||||
return AsyncHybridQuery(
|
||||
self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector))
|
||||
self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector)),
|
||||
self._table,
|
||||
)
|
||||
|
||||
async def to_batches(
|
||||
@@ -3029,7 +3261,7 @@ class AsyncVectorQueryBase:
|
||||
|
||||
|
||||
class AsyncVectorQuery(AsyncStandardQuery, AsyncVectorQueryBase):
|
||||
def __init__(self, inner: LanceVectorQuery):
|
||||
def __init__(self, inner: LanceVectorQuery, table: Optional["AsyncTable"] = None):
|
||||
"""
|
||||
Construct an AsyncVectorQuery
|
||||
|
||||
@@ -3039,7 +3271,7 @@ class AsyncVectorQuery(AsyncStandardQuery, AsyncVectorQueryBase):
|
||||
a vector query. Or you can use
|
||||
[AsyncTable.vector_search][lancedb.table.AsyncTable.vector_search]
|
||||
"""
|
||||
super().__init__(inner)
|
||||
super().__init__(inner, table)
|
||||
self._inner = inner
|
||||
self._reranker = None
|
||||
self._query_string = None
|
||||
@@ -3093,10 +3325,13 @@ class AsyncVectorQuery(AsyncStandardQuery, AsyncVectorQueryBase):
|
||||
|
||||
if isinstance(query, str):
|
||||
return AsyncHybridQuery(
|
||||
self._inner.nearest_to_text({"query": query, "columns": columns})
|
||||
self._inner.nearest_to_text({"query": query, "columns": columns}),
|
||||
self._table,
|
||||
)
|
||||
# FullTextQuery object
|
||||
return AsyncHybridQuery(self._inner.nearest_to_text({"query": query}))
|
||||
return AsyncHybridQuery(
|
||||
self._inner.nearest_to_text({"query": query}), self._table
|
||||
)
|
||||
|
||||
async def to_batches(
|
||||
self,
|
||||
@@ -3123,8 +3358,8 @@ class AsyncHybridQuery(AsyncStandardQuery, AsyncVectorQueryBase):
|
||||
in the `rerank` method to convert the scores to ranks and then normalize them.
|
||||
"""
|
||||
|
||||
def __init__(self, inner: LanceHybridQuery):
|
||||
super().__init__(inner)
|
||||
def __init__(self, inner: LanceHybridQuery, table: Optional["AsyncTable"] = None):
|
||||
super().__init__(inner, table)
|
||||
self._inner = inner
|
||||
self._norm = "score"
|
||||
self._reranker = RRFReranker()
|
||||
@@ -3165,8 +3400,8 @@ class AsyncHybridQuery(AsyncStandardQuery, AsyncVectorQueryBase):
|
||||
max_batch_length: Optional[int] = None,
|
||||
timeout: Optional[timedelta] = None,
|
||||
) -> AsyncRecordBatchReader:
|
||||
fts_query = AsyncFTSQuery(self._inner.to_fts_query())
|
||||
vec_query = AsyncVectorQuery(self._inner.to_vector_query())
|
||||
fts_query = AsyncFTSQuery(self._inner.to_fts_query(), self._table)
|
||||
vec_query = AsyncVectorQuery(self._inner.to_vector_query(), self._table)
|
||||
|
||||
# save the row ID choice that was made on the query builder and force it
|
||||
# to actually fetch the row ids because we need this for reranking
|
||||
@@ -3266,8 +3501,15 @@ class AsyncTakeQuery(AsyncQueryBase):
|
||||
Builder for parameterizing and executing take queries.
|
||||
"""
|
||||
|
||||
def __init__(self, inner: LanceTakeQuery):
|
||||
super().__init__(inner)
|
||||
def __init__(self, inner: LanceTakeQuery, table: Optional["AsyncTable"] = None):
|
||||
super().__init__(inner, table)
|
||||
|
||||
async def _plain_scan_to_pandas(
|
||||
self,
|
||||
blob_mode: BlobMode,
|
||||
**kwargs,
|
||||
) -> Optional["pd.DataFrame"]:
|
||||
return None
|
||||
|
||||
|
||||
class BaseQueryBuilder(object):
|
||||
@@ -3400,6 +3642,8 @@ class BaseQueryBuilder(object):
|
||||
self,
|
||||
flatten: Optional[Union[int, bool]] = None,
|
||||
timeout: Optional[timedelta] = None,
|
||||
*,
|
||||
blob_mode: BlobMode = "lazy",
|
||||
**kwargs,
|
||||
) -> "pd.DataFrame":
|
||||
"""
|
||||
@@ -3433,11 +3677,15 @@ class BaseQueryBuilder(object):
|
||||
The maximum time to wait for the query to complete.
|
||||
If not specified, no timeout is applied. If the query does not
|
||||
complete within the specified time, an error will be raised.
|
||||
blob_mode: str, default "lazy"
|
||||
Controls how blob columns are returned for plain scan queries.
|
||||
**kwargs
|
||||
Forwarded to pyarrow.Table.to_pandas after query execution and
|
||||
optional flattening.
|
||||
"""
|
||||
return LOOP.run(self._inner.to_pandas(flatten, timeout, **kwargs))
|
||||
return LOOP.run(
|
||||
self._inner.to_pandas(flatten, timeout, blob_mode=blob_mode, **kwargs)
|
||||
)
|
||||
|
||||
def to_polars(
|
||||
self,
|
||||
|
||||
@@ -89,6 +89,18 @@ from .index import lang_mapping
|
||||
|
||||
BlobMode = Literal["lazy", "bytes", "descriptions"]
|
||||
|
||||
|
||||
def _field_is_blob(field: pa.Field) -> bool:
|
||||
metadata = field.metadata or {}
|
||||
return metadata.get(b"lance-encoding:blob") == b"true" or (
|
||||
metadata.get("lance-encoding:blob") == "true"
|
||||
)
|
||||
|
||||
|
||||
def _schema_has_blob_field(schema: pa.Schema) -> bool:
|
||||
return any(_field_is_blob(field) for field in schema)
|
||||
|
||||
|
||||
_MODEL_BACKED_TOKENIZER_PREFIXES = ("jieba", "lindera")
|
||||
_MODEL_BACKED_TOKENIZER_ERRORS = (
|
||||
"unknown base tokenizer",
|
||||
@@ -2270,9 +2282,13 @@ class LanceTable(Table):
|
||||
-------
|
||||
pd.DataFrame
|
||||
"""
|
||||
if blob_mode == "lazy" and (
|
||||
self._namespace_client is not None
|
||||
or get_uri_scheme(self._dataset_path) == "memory"
|
||||
if blob_mode == "descriptions" or not _schema_has_blob_field(self.schema):
|
||||
return self.to_arrow().to_pandas(**kwargs)
|
||||
|
||||
if (
|
||||
blob_mode == "lazy"
|
||||
and self._namespace_client is None
|
||||
and get_uri_scheme(self._dataset_path) == "memory"
|
||||
):
|
||||
return self.to_arrow().to_pandas(**kwargs)
|
||||
|
||||
@@ -4280,7 +4296,7 @@ class AsyncTable:
|
||||
can be executed with methods like [to_arrow][lancedb.query.AsyncQuery.to_arrow],
|
||||
[to_pandas][lancedb.query.AsyncQuery.to_pandas] and more.
|
||||
"""
|
||||
return AsyncQuery(self._inner.query())
|
||||
return AsyncQuery(self._inner.query(), self)
|
||||
|
||||
async def _to_lance(self, **kwargs) -> lance.LanceDataset:
|
||||
try:
|
||||
@@ -4312,7 +4328,12 @@ class AsyncTable:
|
||||
-------
|
||||
pd.DataFrame
|
||||
"""
|
||||
if blob_mode == "lazy":
|
||||
if blob_mode == "descriptions" or not _schema_has_blob_field(
|
||||
await self.schema()
|
||||
):
|
||||
return (await self.to_arrow()).to_pandas(**kwargs)
|
||||
|
||||
if blob_mode == "lazy" and get_uri_scheme(await self.uri()) == "memory":
|
||||
return (await self.to_arrow()).to_pandas(**kwargs)
|
||||
return (await self._to_lance()).to_pandas(blob_mode=blob_mode, **kwargs)
|
||||
|
||||
@@ -5349,7 +5370,7 @@ class AsyncTable:
|
||||
pa.RecordBatch
|
||||
A record batch containing the rows at the given offsets.
|
||||
"""
|
||||
return AsyncTakeQuery(self._inner.take_offsets(offsets))
|
||||
return AsyncTakeQuery(self._inner.take_offsets(offsets), self)
|
||||
|
||||
def take_row_ids(self, row_ids: list[int]) -> AsyncTakeQuery:
|
||||
"""
|
||||
@@ -5378,7 +5399,7 @@ class AsyncTable:
|
||||
AsyncTakeQuery
|
||||
A query object that can be executed to get the rows.
|
||||
"""
|
||||
return AsyncTakeQuery(self._inner.take_row_ids(row_ids))
|
||||
return AsyncTakeQuery(self._inner.take_row_ids(row_ids), self)
|
||||
|
||||
@property
|
||||
def tags(self) -> AsyncTags:
|
||||
|
||||
@@ -76,6 +76,35 @@ class TestNamespaceConnection:
|
||||
assert len(result) == 0
|
||||
assert list(result.columns) == ["id", "vector", "text"]
|
||||
|
||||
def test_table_to_pandas_blob_lazy_through_namespace(self):
|
||||
"""Namespace-backed tables should use Lance blob-aware pandas conversion."""
|
||||
pytest.importorskip("lance")
|
||||
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||
db.create_namespace(["test_ns"])
|
||||
data = pa.table(
|
||||
{
|
||||
"id": pa.array([1, 2], pa.int64()),
|
||||
"blob": pa.array([b"hello", b"world"], pa.large_binary()),
|
||||
},
|
||||
schema=pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field(
|
||||
"blob",
|
||||
pa.large_binary(),
|
||||
metadata={"lance-encoding:blob": "true"},
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
table = db.create_table("blob_table", data, namespace_path=["test_ns"])
|
||||
df = table.to_pandas(blob_mode="lazy").sort_values("id")
|
||||
|
||||
blob = df["blob"].iloc[0]
|
||||
assert hasattr(blob, "readall")
|
||||
assert blob.readall() == b"hello"
|
||||
|
||||
def test_open_table_through_namespace(self):
|
||||
"""Test opening an existing table through namespace."""
|
||||
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||
|
||||
@@ -39,6 +39,35 @@ from utils import exception_output
|
||||
from importlib.util import find_spec
|
||||
|
||||
|
||||
def _blob_query_data():
|
||||
return pa.table(
|
||||
{
|
||||
"id": pa.array([1, 2, 3, 4], pa.int64()),
|
||||
"tag": pa.array(["drop", "keep", "keep", "keep"], pa.utf8()),
|
||||
"vector": pa.array(
|
||||
[[1.0, 0.0], [2.0, 0.0], [3.0, 0.0], [4.0, 0.0]],
|
||||
type=pa.list_(pa.float32(), list_size=2),
|
||||
),
|
||||
"blob": pa.array([b"one", b"two", b"three", b"four"], pa.large_binary()),
|
||||
},
|
||||
schema=pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field("tag", pa.utf8()),
|
||||
pa.field("vector", pa.list_(pa.float32(), list_size=2)),
|
||||
pa.field(
|
||||
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _assert_lazy_blob(value, expected: bytes):
|
||||
assert hasattr(value, "readall")
|
||||
assert value.readall() == expected
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def table(tmpdir_factory) -> lancedb.table.Table:
|
||||
tmp_path = str(tmpdir_factory.mktemp("data"))
|
||||
@@ -181,6 +210,97 @@ async def test_query_to_pandas_kwargs(table, table_async):
|
||||
assert async_df["id"].tolist() == [1, 2]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("blob_mode", ["lazy", "bytes", "descriptions"])
|
||||
def test_plain_scan_query_to_pandas_blob_modes(tmp_db, blob_mode):
|
||||
pytest.importorskip("lance")
|
||||
table = tmp_db.create_table(
|
||||
f"test_query_to_pandas_blob_{blob_mode}", _blob_query_data()
|
||||
)
|
||||
|
||||
df = (
|
||||
table.search()
|
||||
.select(["id", "blob"])
|
||||
.where("id = 1")
|
||||
.to_pandas(blob_mode=blob_mode)
|
||||
)
|
||||
|
||||
assert df["id"].tolist() == [1]
|
||||
if blob_mode == "lazy":
|
||||
_assert_lazy_blob(df["blob"].iloc[0], b"one")
|
||||
elif blob_mode == "bytes":
|
||||
assert df["blob"].tolist() == [b"one"]
|
||||
else:
|
||||
first = df["blob"].iloc[0]
|
||||
assert first != b"one"
|
||||
assert not hasattr(first, "readall")
|
||||
|
||||
|
||||
def test_plain_scan_query_to_pandas_blob_projection(tmp_db):
|
||||
pytest.importorskip("lance")
|
||||
table = tmp_db.create_table(
|
||||
"test_query_to_pandas_blob_projection", _blob_query_data()
|
||||
)
|
||||
|
||||
df = (
|
||||
table.search()
|
||||
.where("id >= 2")
|
||||
.select({"id_alias": "id", "payload": "blob", "double_id": "id * 2"})
|
||||
.limit(2)
|
||||
.offset(1)
|
||||
.to_pandas(blob_mode="bytes")
|
||||
)
|
||||
|
||||
assert df["id_alias"].tolist() == [3, 4]
|
||||
assert df["payload"].tolist() == [b"three", b"four"]
|
||||
assert df["double_id"].tolist() == [6, 8]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_plain_scan_query_to_pandas_blob_projection(tmp_db_async):
|
||||
pytest.importorskip("lance")
|
||||
table = await tmp_db_async.create_table(
|
||||
"test_async_query_to_pandas_blob_projection", _blob_query_data()
|
||||
)
|
||||
|
||||
lazy_df = await (
|
||||
table.query().where("id = 1").select(["id", "blob"]).to_pandas(blob_mode="lazy")
|
||||
)
|
||||
assert lazy_df["id"].tolist() == [1]
|
||||
_assert_lazy_blob(lazy_df["blob"].iloc[0], b"one")
|
||||
|
||||
bytes_df = await (
|
||||
table.query()
|
||||
.where("id >= 2")
|
||||
.select({"id_alias": "id", "payload": "blob", "double_id": "id * 2"})
|
||||
.limit(2)
|
||||
.offset(1)
|
||||
.to_pandas(blob_mode="bytes")
|
||||
)
|
||||
assert bytes_df["id_alias"].tolist() == [3, 4]
|
||||
assert bytes_df["payload"].tolist() == [b"three", b"four"]
|
||||
assert bytes_df["double_id"].tolist() == [6, 8]
|
||||
|
||||
desc_df = await (
|
||||
table.query()
|
||||
.where("id = 1")
|
||||
.select(["blob"])
|
||||
.to_pandas(blob_mode="descriptions")
|
||||
)
|
||||
first = desc_df["blob"].iloc[0]
|
||||
assert first != b"one"
|
||||
assert not hasattr(first, "readall")
|
||||
|
||||
|
||||
def test_vector_query_to_pandas_blob_mode_requires_native_path(tmp_db):
|
||||
pytest.importorskip("lance")
|
||||
table = tmp_db.create_table("test_vector_query_blob_mode", _blob_query_data())
|
||||
|
||||
with pytest.raises(RuntimeError, match="Lance native pandas conversion"):
|
||||
table.search([1.0, 0.0]).select(["blob", "vector"]).limit(1).to_pandas(
|
||||
blob_mode="lazy"
|
||||
)
|
||||
|
||||
|
||||
def test_order_by_plain_query(mem_db):
|
||||
table = mem_db.create_table(
|
||||
"test_order_by",
|
||||
|
||||
@@ -26,6 +26,28 @@ from lancedb.table import LanceTable
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
def _blob_test_data():
|
||||
return pa.table(
|
||||
{
|
||||
"id": pa.array([1, 2], pa.int64()),
|
||||
"blob": pa.array([b"hello", b"world"], pa.large_binary()),
|
||||
},
|
||||
schema=pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field(
|
||||
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _assert_lazy_blob(value, expected: bytes):
|
||||
assert hasattr(value, "readall")
|
||||
assert value.readall() == expected
|
||||
|
||||
|
||||
def test_basic(mem_db: DBConnection):
|
||||
data = [
|
||||
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
|
||||
@@ -57,27 +79,22 @@ def test_table_to_pandas_default_matches_arrow(tmp_db: DBConnection):
|
||||
pd.testing.assert_frame_equal(table.to_pandas(), expected)
|
||||
|
||||
|
||||
def test_table_to_pandas_blob_bytes(tmp_db: DBConnection):
|
||||
@pytest.mark.parametrize("blob_mode", ["lazy", "bytes", "descriptions"])
|
||||
def test_table_to_pandas_blob_modes(tmp_db: DBConnection, blob_mode):
|
||||
pytest.importorskip("lance")
|
||||
data = pa.table(
|
||||
{
|
||||
"id": pa.array([1, 2], pa.int64()),
|
||||
"blob": pa.array([b"hello", b"world"], pa.large_binary()),
|
||||
},
|
||||
schema=pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field(
|
||||
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
table = tmp_db.create_table("test_to_pandas_blob_bytes", data=data)
|
||||
table = tmp_db.create_table(f"test_to_pandas_blob_{blob_mode}", _blob_test_data())
|
||||
|
||||
df = table.to_pandas(blob_mode="bytes")
|
||||
df = table.to_pandas(blob_mode=blob_mode)
|
||||
|
||||
assert df["blob"].tolist() == [b"hello", b"world"]
|
||||
if blob_mode == "lazy":
|
||||
_assert_lazy_blob(df["blob"].iloc[0], b"hello")
|
||||
_assert_lazy_blob(df["blob"].iloc[1], b"world")
|
||||
elif blob_mode == "bytes":
|
||||
assert df["blob"].tolist() == [b"hello", b"world"]
|
||||
else:
|
||||
first = df["blob"].iloc[0]
|
||||
assert first != b"hello"
|
||||
assert not hasattr(first, "readall")
|
||||
|
||||
|
||||
def test_table_to_pandas_kwargs(tmp_db: DBConnection):
|
||||
@@ -93,22 +110,8 @@ def test_table_to_pandas_kwargs(tmp_db: DBConnection):
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_table_to_pandas_blob_bytes(tmp_db_async: AsyncConnection):
|
||||
pytest.importorskip("lance")
|
||||
data = pa.table(
|
||||
{
|
||||
"id": pa.array([1, 2], pa.int64()),
|
||||
"blob": pa.array([b"hello", b"world"], pa.large_binary()),
|
||||
},
|
||||
schema=pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field(
|
||||
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
table = await tmp_db_async.create_table(
|
||||
"test_async_to_pandas_blob_bytes", data=data
|
||||
"test_async_to_pandas_blob_bytes", data=_blob_test_data()
|
||||
)
|
||||
|
||||
df = await table.to_pandas(blob_mode="bytes")
|
||||
|
||||
Reference in New Issue
Block a user