Files
lancedb/python/python/lancedb/table.py
yaommen a813ce2f71 fix(python): sanitize bad vectors before Arrow cast (#3158)
## Problem

`on_bad_vectors="drop"` is supposed to remove invalid vector rows before
write, but for some schema-defined vector columns it can still fail
later during Arrow cast instead of dropping the bad row.

Repro:
```python
class MySchema(LanceModel):
    text: str
    embedding: Vector(16)

table = db.create_table("test", schema=MySchema)
table.add(
    [
        {"text": "hello", "embedding": []},
        {"text": "bar", "embedding": [0.1] * 16},
    ],
    on_bad_vectors="drop",
)
```
Before:
```
RuntimeError
Arrow error: C Data interface error: Invalid: ListType can only be casted to FixedSizeListType if the lists are all the expected size.
```
After:
```
rows 1
texts ['bar']
```
## Solution

Make bad-vector sanitization use schema dimensions before cast, while
keeping the handling scoped to vector columns identified by schema
metadata or existing vector-name heuristics.

This also preserves existing integer vector inputs and avoids applying
on_bad_vectors to unrelated fixed-size float columns.


Fixes #1670

Signed-off-by: yaommen <myanstu@163.com>
2026-04-08 09:09:41 -07:00

5257 lines
183 KiB
Python

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
from __future__ import annotations
import asyncio
import inspect
import deprecation
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import cached_property
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Literal,
Optional,
Tuple,
Union,
overload,
)
from urllib.parse import urlparse
from lancedb.scannable import _register_optional_converters, to_scannable
from . import __version__
from lancedb.arrow import peek_reader
from lancedb.background_loop import LOOP
from .dependencies import (
_check_for_hugging_face,
_check_for_lance,
_check_for_pandas,
lance,
pandas as pd,
polars as pl,
)
import pyarrow as pa
import pyarrow.dataset
import pyarrow.compute as pc
import pyarrow.fs as pa_fs
import numpy as np
from .common import DATA, VEC, VECTOR_COLUMN_NAME
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
from .index import (
BTree,
IvfFlat,
IvfPq,
IvfSq,
Bitmap,
IvfRq,
LabelList,
HnswPq,
HnswSq,
FTS,
)
from .merge import LanceMergeInsertBuilder
from .pydantic import LanceModel, model_to_dict
from .query import (
AsyncFTSQuery,
AsyncHybridQuery,
AsyncQuery,
AsyncTakeQuery,
AsyncVectorQuery,
FullTextQuery,
LanceEmptyQueryBuilder,
LanceFtsQueryBuilder,
LanceHybridQueryBuilder,
LanceQueryBuilder,
LanceVectorQueryBuilder,
LanceTakeQueryBuilder,
Query,
)
from .util import (
add_note,
fs_from_uri,
get_uri_scheme,
infer_vector_column_name,
join_uri,
value_to_sql,
)
from .index import lang_mapping
if TYPE_CHECKING:
from .db import LanceDBConnection
from ._lancedb import (
Table as LanceDBTable,
OptimizeStats,
CleanupStats,
CompactionStats,
Tag,
AddColumnsResult,
AddResult,
AlterColumnsResult,
DeleteResult,
DropColumnsResult,
MergeResult,
UpdateResult,
)
from .index import IndexConfig
import pandas
import PIL
from .types import (
QueryType,
OnBadVectorsType,
AddMode,
CreateMode,
VectorIndexType,
ScalarIndexType,
BaseTokenizerType,
DistanceType,
)
def _into_pyarrow_reader(
data, schema: Optional[pa.Schema] = None
) -> pa.RecordBatchReader:
from lancedb.dependencies import datasets
if _check_for_hugging_face(data):
if isinstance(data, datasets.Dataset):
schema = data.features.arrow_schema
return pa.RecordBatchReader.from_batches(schema, data.data.to_batches())
elif isinstance(data, datasets.dataset_dict.DatasetDict):
schema = _schema_from_hf(data, None)
if "split" not in schema.names:
schema = schema.append(pa.field("split", pa.string()))
return pa.RecordBatchReader.from_batches(
schema, _to_batches_with_split(data)
)
if isinstance(data, LanceModel):
raise ValueError("Cannot add a single LanceModel to a table. Use a list.")
if isinstance(data, dict):
raise ValueError("Cannot add a single dictionary to a table. Use a list.")
if isinstance(data, list):
# Handle empty list case
if not data:
if schema is None:
raise ValueError("Cannot create table from empty list without a schema")
return pa.Table.from_pylist(data, schema=schema).to_reader()
# convert to list of dict if data is a bunch of LanceModels
if isinstance(data[0], LanceModel):
schema = data[0].__class__.to_arrow_schema()
data = [model_to_dict(d) for d in data]
return pa.Table.from_pylist(data, schema=schema).to_reader()
elif isinstance(data[0], pa.RecordBatch):
return pa.Table.from_batches(data).to_reader()
else:
return pa.Table.from_pylist(data).to_reader()
elif _check_for_pandas(data) and isinstance(data, pd.DataFrame):
table = pa.Table.from_pandas(data, preserve_index=False)
# Do not serialize Pandas metadata
meta = table.schema.metadata if table.schema.metadata is not None else {}
meta = {k: v for k, v in meta.items() if k != b"pandas"}
return table.replace_schema_metadata(meta).to_reader()
elif isinstance(data, pa.Table):
return data.to_reader()
elif isinstance(data, pa.RecordBatch):
return pa.RecordBatchReader.from_batches(data.schema, [data])
elif _check_for_lance(data) and isinstance(data, lance.LanceDataset):
return data.scanner().to_reader()
elif isinstance(data, pa.dataset.Dataset):
return data.scanner().to_reader()
elif isinstance(data, pa.dataset.Scanner):
return data.to_reader()
elif isinstance(data, pa.RecordBatchReader):
return data
elif (
type(data).__module__.startswith("polars")
and data.__class__.__name__ == "DataFrame"
):
return data.to_arrow().to_reader()
elif (
type(data).__module__.startswith("polars")
and data.__class__.__name__ == "LazyFrame"
):
return data.collect().to_arrow().to_reader()
elif isinstance(data, Iterable):
return _iterator_to_reader(data)
else:
raise TypeError(
f"Unknown data type {type(data)}. "
"Supported types: list of dicts, pandas DataFrame, polars DataFrame, "
"pyarrow Table/RecordBatch, or Pydantic models. "
"See https://lancedb.com/docs/tables/ for examples."
)
def _iterator_to_reader(data: Iterable) -> pa.RecordBatchReader:
# Each batch is treated as it's own reader, mainly so we can
# re-use the _into_pyarrow_reader logic.
first = _into_pyarrow_reader(next(data))
schema = first.schema
def gen():
yield from first
for batch in data:
table: pa.Table = _into_pyarrow_reader(batch).read_all()
if table.schema != schema:
try:
table = table.cast(schema)
except pa.lib.ArrowInvalid:
raise ValueError(
f"Input iterator yielded a batch with schema that "
f"does not match the schema of other batches.\n"
f"Expected:\n{schema}\nGot:\n{batch.schema}"
)
yield from table.to_batches()
return pa.RecordBatchReader.from_batches(schema, gen())
def _sanitize_data(
data: "DATA",
target_schema: Optional[pa.Schema] = None,
metadata: Optional[dict] = None, # embedding metadata
on_bad_vectors: OnBadVectorsType = "error",
fill_value: float = 0.0,
*,
allow_subschema: bool = False,
) -> pa.RecordBatchReader:
"""
Handle input data, applying all standard transformations.
This includes:
* Converting the data to a PyArrow Table
* Adding vector columns defined in the metadata
* Adding embedding metadata into the schema
* Casting the table to the target schema
* Handling bad vectors
Parameters
----------
target_schema : Optional[pa.Schema], default None
The schema to cast the table to. This is typically the schema of the table
if it already exists. Otherwise it might be a user-requested schema.
allow_subschema : bool, default False
If True, the input table is allowed to omit columns from the target schema.
The target schema will be filtered to only include columns that are present
in the input table before casting.
metadata : Optional[dict], default None
The embedding metadata to add to the schema.
on_bad_vectors : Literal["error", "drop", "fill", "null"], default "error"
What to do if any of the vectors are not the same size or contains NaNs.
fill_value : float, default 0.0
The value to use when filling vectors. Only used if on_bad_vectors="fill".
All entries in the vector will be set to this value.
"""
# At this point, the table might not match the schema we are targeting:
# 1. There might be embedding columns missing that will be added
# in the add_embeddings step.
# 2. If `allow_subschemas` is True, there might be columns missing.
reader = _into_pyarrow_reader(data, target_schema)
reader = _append_vector_columns(reader, target_schema, metadata=metadata)
# This happens before the cast so we can fix vector columns with
# incorrect lengths before they are cast to FSL.
reader = _handle_bad_vectors(
reader,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
target_schema=target_schema,
metadata=metadata,
)
if target_schema is None:
target_schema, reader = _infer_target_schema(reader)
if metadata:
target_schema = target_schema.with_metadata(
_merge_metadata(target_schema.metadata, metadata)
)
_validate_schema(target_schema)
reader = _cast_to_target_schema(reader, target_schema, allow_subschema)
return reader
def _cast_to_target_schema(
reader: pa.RecordBatchReader,
target_schema: pa.Schema,
allow_subschema: bool = False,
) -> pa.RecordBatchReader:
# pa.Table.cast expects field order not to be changed.
# Lance doesn't care about field order, so we don't need to rearrange fields
# to match the target schema. We just need to correctly cast the fields.
if reader.schema.equals(target_schema, check_metadata=True):
# Fast path when the schemas are already the same
return reader
fields = _align_field_types(list(iter(reader.schema)), list(iter(target_schema)))
reordered_schema = pa.schema(fields, metadata=target_schema.metadata)
if not allow_subschema and len(reordered_schema) != len(target_schema):
raise ValueError(
"Input table has different number of columns than target schema"
)
if allow_subschema and len(reordered_schema) != len(target_schema):
fields = _infer_subschema(
list(iter(reader.schema)), list(iter(reordered_schema))
)
reordered_schema = pa.schema(fields, metadata=target_schema.metadata)
def gen():
for batch in reader:
# Table but not RecordBatch has cast.
cast_batches = (
pa.Table.from_batches([batch]).cast(reordered_schema).to_batches()
)
if cast_batches:
yield pa.RecordBatch.from_arrays(
cast_batches[0].columns, schema=reordered_schema
)
return pa.RecordBatchReader.from_batches(reordered_schema, gen())
def _align_field_types(
fields: List[pa.Field],
target_fields: List[pa.Field],
) -> List[pa.Field]:
"""
Apply the data types from the target_fields to the fields.
"""
new_fields = []
for field in fields:
target_field = next((f for f in target_fields if f.name == field.name), None)
if target_field is None:
raise ValueError(f"Field '{field.name}' not found in target schema")
if pa.types.is_struct(target_field.type):
if pa.types.is_struct(field.type):
new_type = pa.struct(
_align_field_types(
field.type.fields,
target_field.type.fields,
)
)
else:
new_type = target_field.type
elif pa.types.is_list(target_field.type):
if _is_list_like(field.type):
new_type = pa.list_(
_align_field_types(
[field.type.value_field],
[target_field.type.value_field],
)[0]
)
else:
new_type = target_field.type
elif pa.types.is_large_list(target_field.type):
if _is_list_like(field.type):
new_type = pa.large_list(
_align_field_types(
[field.type.value_field],
[target_field.type.value_field],
)[0]
)
else:
new_type = target_field.type
elif pa.types.is_fixed_size_list(target_field.type):
if _is_list_like(field.type):
new_type = pa.list_(
_align_field_types(
[field.type.value_field],
[target_field.type.value_field],
)[0],
target_field.type.list_size,
)
else:
new_type = target_field.type
else:
new_type = target_field.type
new_fields.append(
pa.field(field.name, new_type, field.nullable, target_field.metadata)
)
return new_fields
def _infer_subschema(
schema: List[pa.Field],
reference_fields: List[pa.Field],
) -> List[pa.Field]:
"""
Transform the list of fields so the types match the reference_fields.
The order of the fields is preserved.
``schema`` may have fewer fields than `reference_fields`, but it may not have
more fields.
"""
fields = []
lookup = {f.name: f for f in reference_fields}
for field in schema:
reference = lookup.get(field.name)
if reference is None:
raise ValueError("Unexpected field in schema: {}".format(field))
if pa.types.is_struct(reference.type):
new_type = pa.struct(
_infer_subschema(
field.type.fields,
reference.type.fields,
)
)
new_field = pa.field(
field.name,
new_type,
reference.nullable,
)
else:
new_field = reference
fields.append(new_field)
return fields
def sanitize_create_table(
data,
schema: Union[pa.Schema, LanceModel],
metadata=None,
on_bad_vectors: OnBadVectorsType = "error",
fill_value: float = 0.0,
):
if inspect.isclass(schema) and issubclass(schema, LanceModel):
# convert LanceModel to pyarrow schema
# note that it's possible this contains
# embedding function metadata already
schema: pa.Schema = schema.to_arrow_schema()
if data is not None:
if metadata is None and schema is not None:
metadata = schema.metadata
data = _sanitize_data(
data,
schema,
metadata=metadata,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
)
schema = data.schema
else:
if schema is not None:
data = pa.Table.from_pylist([], schema)
if schema is None:
if data is None:
raise ValueError("Either data or schema must be provided")
elif hasattr(data, "schema"):
schema = data.schema
if metadata:
metadata = _merge_metadata(schema.metadata, metadata)
schema = schema.with_metadata(metadata)
# Need to apply metadata to the data as well
if isinstance(data, pa.Table):
data = data.replace_schema_metadata(metadata)
elif isinstance(data, pa.RecordBatchReader):
data = pa.RecordBatchReader.from_batches(schema, data)
return data, schema
def _schema_from_hf(data, schema) -> pa.Schema:
"""
Extract pyarrow schema from HuggingFace DatasetDict
and validate that they're all the same schema between
splits
"""
for dataset in data.values():
if schema is None:
schema = dataset.features.arrow_schema
elif schema != dataset.features.arrow_schema:
msg = "All datasets in a HuggingFace DatasetDict must have the same schema"
raise TypeError(msg)
return schema
def _to_batches_with_split(data):
"""
Return a generator of RecordBatches from a HuggingFace DatasetDict
with an extra `split` column
"""
for key, dataset in data.items():
for batch in dataset.data.to_batches():
table = pa.Table.from_batches([batch])
if "split" not in table.column_names:
table = table.append_column(
"split", pa.array([key] * batch.num_rows, pa.string())
)
for b in table.to_batches():
yield b
def _append_vector_columns(
reader: pa.RecordBatchReader,
schema: Optional[pa.Schema] = None,
*,
metadata: Optional[dict] = None,
) -> pa.RecordBatchReader:
"""
Use the embedding function to automatically embed the source columns and add the
vector columns to the table.
"""
if schema is None:
metadata = _merge_metadata(metadata)
else:
metadata = _merge_metadata(schema.metadata, metadata)
functions = EmbeddingFunctionRegistry.get_instance().parse_functions(metadata)
if not functions:
return reader
fields = list(reader.schema)
for vector_column, conf in functions.items():
if vector_column not in reader.schema.names:
if schema is not None:
field = schema.field(vector_column)
else:
dtype = pa.list_(pa.float32(), conf.function.ndims())
field = pa.field(vector_column, type=dtype, nullable=True)
fields.append(field)
schema = pa.schema(fields, metadata=reader.schema.metadata)
def gen():
for batch in reader:
for vector_column, conf in functions.items():
func = conf.function
no_vector_column = vector_column not in batch.column_names
if no_vector_column or pc.all(pc.is_null(batch[vector_column])).as_py():
col_data = func.compute_source_embeddings_with_retry(
batch[conf.source_column]
)
if no_vector_column:
batch = batch.append_column(
schema.field(vector_column),
pa.array(col_data, type=schema.field(vector_column).type),
)
else:
batch = batch.set_column(
batch.column_names.index(vector_column),
schema.field(vector_column),
pa.array(col_data, type=schema.field(vector_column).type),
)
yield batch
return pa.RecordBatchReader.from_batches(schema, gen())
def _table_path(base: str, table_name: str) -> str:
"""
Get a table path that can be used in PyArrow FS.
Removes any weird schemes (such as "s3+ddb") and drops any query params.
"""
uri = _table_uri(base, table_name)
# Parse as URL
parsed = urlparse(uri)
# If scheme is s3+ddb, convert to s3
if parsed.scheme == "s3+ddb":
parsed = parsed._replace(scheme="s3")
# Remove query parameters
return parsed._replace(query=None).geturl()
def _table_uri(base: str, table_name: str) -> str:
return join_uri(base, f"{table_name}.lance")
def _normalize_progress(progress):
"""Normalize a ``progress`` parameter for :meth:`Table.add`.
Returns ``(progress_obj, owns)`` where *owns* is True when we created a
tqdm bar that the caller must close.
"""
if progress is True:
from tqdm.auto import tqdm
return tqdm(unit=" rows"), True
if progress is False or progress is None:
return None, False
return progress, False
class Table(ABC):
"""
A Table is a collection of Records in a LanceDB Database.
Examples
--------
Create using [DBConnection.create_table][lancedb.DBConnection.create_table]
(more examples in that method's documentation).
>>> import lancedb
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data=[{"vector": [1.1, 1.2], "b": 2}])
>>> table.head()
pyarrow.Table
vector: fixed_size_list<item: float>[2]
child 0, item: float
b: int64
----
vector: [[[1.1,1.2]]]
b: [[2]]
Can append new data with [Table.add()][lancedb.table.Table.add].
>>> table.add([{"vector": [0.5, 1.3], "b": 4}])
AddResult(version=2)
Can query the table with [Table.search][lancedb.table.Table.search].
>>> table.search([0.4, 0.4]).select(["b", "vector"]).to_pandas()
b vector _distance
0 4 [0.5, 1.3] 0.82
1 2 [1.1, 1.2] 1.13
Search queries are much faster when an index is created. See
[Table.create_index][lancedb.table.Table.create_index].
"""
@property
@abstractmethod
def name(self) -> str:
"""The name of this Table"""
raise NotImplementedError
@property
@abstractmethod
def version(self) -> int:
"""The version of this Table"""
raise NotImplementedError
@property
@abstractmethod
def schema(self) -> pa.Schema:
"""The [Arrow Schema](https://arrow.apache.org/docs/python/api/datatypes.html#)
of this Table
"""
raise NotImplementedError
@property
@abstractmethod
def tags(self) -> Tags:
"""Tag management for the table.
Similar to Git, tags are a way to add metadata to a specific version of the
table.
.. warning::
Tagged versions are exempted from the :py:meth:`cleanup_old_versions()`
process.
To remove a version that has been tagged, you must first
:py:meth:`~Tags.delete` the associated tag.
Examples
--------
.. code-block:: python
table = db.open_table("my_table")
table.tags.create("v2-prod-20250203", 10)
tags = table.tags.list()
"""
raise NotImplementedError
def __len__(self) -> int:
"""The number of rows in this Table"""
return self.count_rows(None)
@property
@abstractmethod
def embedding_functions(self) -> Dict[str, EmbeddingFunctionConfig]:
"""
Get a mapping from vector column name to it's configured embedding function.
"""
@abstractmethod
def count_rows(self, filter: Optional[str] = None) -> int:
"""
Count the number of rows in the table.
Parameters
----------
filter: str, optional
A SQL where clause to filter the rows to count.
"""
raise NotImplementedError
def to_pandas(self) -> "pandas.DataFrame":
"""Return the table as a pandas DataFrame.
Returns
-------
pd.DataFrame
"""
return self.to_arrow().to_pandas()
@abstractmethod
def to_arrow(self) -> pa.Table:
"""Return the table as a pyarrow Table.
Returns
-------
pa.Table
"""
raise NotImplementedError
def to_lance(self, **kwargs) -> lance.LanceDataset:
"""Return the table as a lance.LanceDataset.
Returns
-------
lance.LanceDataset
"""
raise NotImplementedError
def to_polars(self, **kwargs) -> "pl.DataFrame":
"""Return the table as a polars.DataFrame.
Returns
-------
polars.DataFrame
"""
raise NotImplementedError
def create_index(
self,
metric="l2",
num_partitions=256,
num_sub_vectors=96,
vector_column_name: str = VECTOR_COLUMN_NAME,
replace: bool = True,
accelerator: Optional[str] = None,
index_cache_size: Optional[int] = None,
*,
index_type: VectorIndexType = "IVF_PQ",
wait_timeout: Optional[timedelta] = None,
num_bits: int = 8,
max_iterations: int = 50,
sample_rate: int = 256,
m: int = 20,
ef_construction: int = 300,
name: Optional[str] = None,
train: bool = True,
target_partition_size: Optional[int] = None,
):
"""Create an index on the table.
Parameters
----------
metric: str, default "l2"
The distance metric to use when creating the index.
Valid values are "l2", "cosine", "dot", or "hamming".
l2 is euclidean distance.
Hamming is available only for binary vectors.
num_partitions: int, default 256
The number of IVF partitions to use when creating the index.
Default is 256.
num_sub_vectors: int, default 96
The number of PQ sub-vectors to use when creating the index.
Default is 96.
vector_column_name: str, default "vector"
The vector column name to create the index.
replace: bool, default True
- If True, replace the existing index if it exists.
- If False, raise an error if duplicate index exists.
accelerator: str, default None
If set, use the given accelerator to create the index.
Only support "cuda" for now.
index_cache_size : int, optional
The size of the index cache in number of entries. Default value is 256.
num_bits: int
The number of bits to encode sub-vectors. Only used with the IVF_PQ index.
Only 4 and 8 are supported.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
name: str, optional
The name of the index. If not provided, a default name will be generated.
train: bool, default True
Whether to train the index with existing data. Vector indices always train
with existing data.
"""
raise NotImplementedError
def drop_index(self, name: str) -> None:
"""
Drop an index from the table.
Parameters
----------
name: str
The name of the index to drop.
Notes
-----
This does not delete the index from disk, it just removes it from the table.
To delete the index, run [optimize][lancedb.table.Table.optimize]
after dropping the index.
Use [list_indices][lancedb.table.Table.list_indices] to find the names of
the indices.
"""
raise NotImplementedError
def wait_for_index(
self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300)
) -> None:
"""
Wait for indexing to complete for the given index names.
This will poll the table until all the indices are fully indexed,
or raise a timeout exception if the timeout is reached.
Parameters
----------
index_names: str
The name of the indices to poll
timeout: timedelta
Timeout to wait for asynchronous indexing. The default is 5 minutes.
"""
raise NotImplementedError
@abstractmethod
def stats(self) -> TableStatistics:
"""
Retrieve table and fragment statistics.
"""
raise NotImplementedError
@abstractmethod
def create_scalar_index(
self,
column: str,
*,
replace: bool = True,
index_type: ScalarIndexType = "BTREE",
wait_timeout: Optional[timedelta] = None,
name: Optional[str] = None,
):
"""Create a scalar index on a column.
Parameters
----------
column : str
The column to be indexed. Must be a boolean, integer, float,
or string column.
replace : bool, default True
Replace the existing index if it exists.
index_type: Literal["BTREE", "BITMAP", "LABEL_LIST"], default "BTREE"
The type of index to create.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
name: str, optional
The name of the index. If not provided, a default name will be generated.
Examples
--------
Scalar indices, like vector indices, can be used to speed up scans. A scalar
index can speed up scans that contain filter expressions on the indexed column.
For example, the following scan will be faster if the column ``my_col`` has
a scalar index:
>>> import lancedb # doctest: +SKIP
>>> db = lancedb.connect("/data/lance") # doctest: +SKIP
>>> img_table = db.open_table("images") # doctest: +SKIP
>>> my_df = img_table.search().where("my_col = 7", # doctest: +SKIP
... prefilter=True).to_pandas()
Scalar indices can also speed up scans containing a vector search and a
prefilter:
>>> import lancedb # doctest: +SKIP
>>> db = lancedb.connect("/data/lance") # doctest: +SKIP
>>> img_table = db.open_table("images") # doctest: +SKIP
>>> img_table.search([1, 2, 3, 4], vector_column_name="vector") # doctest: +SKIP
... .where("my_col != 7", prefilter=True)
... .to_pandas()
Scalar indices can only speed up scans for basic filters using
equality, comparison, range (e.g. ``my_col BETWEEN 0 AND 100``), and set
membership (e.g. `my_col IN (0, 1, 2)`)
Scalar indices can be used if the filter contains multiple indexed columns and
the filter criteria are AND'd or OR'd together
(e.g. ``my_col < 0 AND other_col> 100``)
Scalar indices may be used if the filter contains non-indexed columns but,
depending on the structure of the filter, they may not be usable. For example,
if the column ``not_indexed`` does not have a scalar index then the filter
``my_col = 0 OR not_indexed = 1`` will not be able to use any scalar index on
``my_col``.
"""
raise NotImplementedError
def create_fts_index(
self,
field_names: Union[str, List[str]],
*,
ordering_field_names: Optional[Union[str, List[str]]] = None,
replace: bool = False,
writer_heap_size: Optional[int] = 1024 * 1024 * 1024,
use_tantivy: bool = False,
tokenizer_name: Optional[str] = None,
with_position: bool = False,
# tokenizer configs:
base_tokenizer: BaseTokenizerType = "simple",
language: str = "English",
max_token_length: Optional[int] = 40,
lower_case: bool = True,
stem: bool = True,
remove_stop_words: bool = True,
ascii_folding: bool = True,
ngram_min_length: int = 3,
ngram_max_length: int = 3,
prefix_only: bool = False,
wait_timeout: Optional[timedelta] = None,
name: Optional[str] = None,
):
"""Create a full-text search index on the table.
Warning - this API is highly experimental and is highly likely to change
in the future.
Parameters
----------
field_names: str or list of str
The name(s) of the field to index.
If ``use_tantivy`` is False (default), only a single field name
(str) is supported. To index multiple fields, create a separate
FTS index for each field.
replace: bool, default False
If True, replace the existing index if it exists. Note that this is
not yet an atomic operation; the index will be temporarily
unavailable while the new index is being created.
writer_heap_size: int, default 1GB
Only available with use_tantivy=True
ordering_field_names:
A list of unsigned type fields to index to optionally order
results on at search time.
only available with use_tantivy=True
tokenizer_name: str, default "default"
The tokenizer to use for the index. Can be "raw", "default" or the 2 letter
language code followed by "_stem". So for english it would be "en_stem".
For available languages see: https://docs.rs/tantivy/latest/tantivy/tokenizer/enum.Language.html
use_tantivy: bool, default False
If True, use the legacy full-text search implementation based on tantivy.
If False, use the new full-text search implementation based on lance-index.
with_position: bool, default False
Only available with use_tantivy=False
If False, do not store the positions of the terms in the text.
This can reduce the size of the index and improve indexing speed.
But it will raise an exception for phrase queries.
base_tokenizer : str, default "simple"
The base tokenizer to use for tokenization. Options are:
- "simple": Splits text by whitespace and punctuation.
- "whitespace": Split text by whitespace, but not punctuation.
- "raw": No tokenization. The entire text is treated as a single token.
- "ngram": N-Gram tokenizer.
language : str, default "English"
The language to use for tokenization.
max_token_length : int, default 40
The maximum token length to index. Tokens longer than this length will be
ignored.
lower_case : bool, default True
Whether to convert the token to lower case. This makes queries
case-insensitive.
stem : bool, default True
Whether to stem the token. Stemming reduces words to their root form.
For example, in English "running" and "runs" would both be reduced to "run".
remove_stop_words : bool, default True
Whether to remove stop words. Stop words are common words that are often
removed from text before indexing. For example, in English "the" and "and".
ascii_folding : bool, default True
Whether to fold ASCII characters. This converts accented characters to
their ASCII equivalent. For example, "café" would be converted to "cafe".
ngram_min_length: int, default 3
The minimum length of an n-gram.
ngram_max_length: int, default 3
The maximum length of an n-gram.
prefix_only: bool, default False
Whether to only index the prefix of the token for ngram tokenizer.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
name: str, optional
The name of the index. If not provided, a default name will be generated.
"""
raise NotImplementedError
@abstractmethod
def add(
self,
data: DATA,
mode: AddMode = "append",
on_bad_vectors: OnBadVectorsType = "error",
fill_value: float = 0.0,
progress: Optional[Union[bool, Callable, Any]] = None,
) -> AddResult:
"""Add more data to the [Table](Table).
Parameters
----------
data: DATA
The data to insert into the table. Acceptable types are:
- list-of-dict
- pandas.DataFrame
- pyarrow.Table or pyarrow.RecordBatch
mode: str
The mode to use when writing the data. Valid values are
"append" and "overwrite".
on_bad_vectors: str, default "error"
What to do if any of the vectors are not the same size or contains NaNs.
One of "error", "drop", "fill".
fill_value: float, default 0.
The value to use when filling vectors. Only used if on_bad_vectors="fill".
progress: bool, callable, or tqdm-like, optional
Progress reporting during the add operation. Can be:
- ``True`` to automatically create and display a tqdm progress
bar (requires ``tqdm`` to be installed)::
table.add(data, progress=True)
- A **callable** that receives a dict with keys ``output_rows``,
``output_bytes``, ``total_rows``, ``elapsed_seconds``,
``active_tasks``, ``total_tasks``, and ``done``::
def on_progress(p):
print(f"{p['output_rows']}/{p['total_rows']} rows, "
f"{p['active_tasks']}/{p['total_tasks']} workers")
table.add(data, progress=on_progress)
- A **tqdm-compatible** progress bar whose ``total`` and
``update()`` will be called automatically. The postfix shows
write throughput (MB/s) and active worker count::
with tqdm() as pbar:
table.add(data, progress=pbar)
Returns
-------
AddResult
An object containing the new version number of the table after adding data.
"""
raise NotImplementedError
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
"""
Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
that can be used to create a "merge insert" operation
This operation can add rows, update rows, and remove rows all in a single
transaction. It is a very generic tool that can be used to create
behaviors like "insert if not exists", "update or insert (i.e. upsert)",
or even replace a portion of existing data with new data (e.g. replace
all data where month="january")
The merge insert operation works by combining new data from a
**source table** with existing data in a **target table** by using a
join. There are three categories of records.
"Matched" records are records that exist in both the source table and
the target table. "Not matched" records exist only in the source table
(e.g. these are new data) "Not matched by source" records exist only
in the target table (this is old data)
The builder returned by this method can be used to customize what
should happen for each category of data.
Please note that the data may appear to be reordered as part of this
operation. This is because updated rows will be deleted from the
dataset and then reinserted at the end with the new values.
Parameters
----------
on: Union[str, Iterable[str]]
A column (or columns) to join on. This is how records from the
source table and target table are matched. Typically this is some
kind of key or id column.
Examples
--------
>>> import lancedb
>>> data = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data)
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> res = table.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> res
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1)
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
a b
0 1 b
1 2 x
2 3 y
3 4 z
""" # noqa: E501
on = [on] if isinstance(on, str) else list(iter(on))
return LanceMergeInsertBuilder(self, on)
@abstractmethod
def search(
self,
query: Optional[
Union[VEC, str, "PIL.Image.Image", Tuple, FullTextQuery]
] = None,
vector_column_name: Optional[str] = None,
query_type: QueryType = "auto",
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> LanceQueryBuilder:
"""Create a search query to find the nearest neighbors
of the given query vector. We currently support [vector search][search]
and [full-text search][experimental-full-text-search].
All query options are defined in
[LanceQueryBuilder][lancedb.query.LanceQueryBuilder].
Examples
--------
>>> import lancedb
>>> db = lancedb.connect("./.lancedb")
>>> data = [
... {"original_width": 100, "caption": "bar", "vector": [0.1, 2.3, 4.5]},
... {"original_width": 2000, "caption": "foo", "vector": [0.5, 3.4, 1.3]},
... {"original_width": 3000, "caption": "test", "vector": [0.3, 6.2, 2.6]}
... ]
>>> table = db.create_table("my_table", data)
>>> query = [0.4, 1.4, 2.4]
>>> (table.search(query)
... .where("original_width > 1000", prefilter=True)
... .select(["caption", "original_width", "vector"])
... .limit(2)
... .to_pandas())
caption original_width vector _distance
0 foo 2000 [0.5, 3.4, 1.3] 5.220000
1 test 3000 [0.3, 6.2, 2.6] 23.089996
Parameters
----------
query: list/np.ndarray/str/PIL.Image.Image, default None
The targetted vector to search for.
- *default None*.
Acceptable types are: list, np.ndarray, PIL.Image.Image
- If None then the select/where/limit clauses are applied to filter
the table
vector_column_name: str, optional
The name of the vector column to search.
The vector column needs to be a pyarrow fixed size list type
- If not specified then the vector column is inferred from
the table schema
- If the table has multiple vector columns then the *vector_column_name*
needs to be specified. Otherwise, an error is raised.
query_type: str
*default "auto"*.
Acceptable types are: "vector", "fts", "hybrid", or "auto"
- If "auto" then the query type is inferred from the query;
- If `query` is a list/np.ndarray then the query type is
"vector";
- If `query` is a PIL.Image.Image then either do vector search,
or raise an error if no corresponding embedding function is found.
- If `query` is a string, then the query type is "vector" if the
table has embedding functions else the query type is "fts"
Returns
-------
LanceQueryBuilder
A query builder object representing the query.
Once executed, the query returns
- selected columns
- the vector
- and also the "_distance" column which is the distance between the query
vector and the returned vector.
"""
raise NotImplementedError
@abstractmethod
def take_offsets(
self, offsets: list[int], *, with_row_id: bool = False
) -> LanceTakeQueryBuilder:
"""
Take a list of offsets from the table.
Offsets are 0-indexed and relative to the current version of the table. Offsets
are not stable. A row with an offset of N may have a different offset in a
different version of the table (e.g. if an earlier row is deleted).
Offsets are mostly useful for sampling as the set of all valid offsets is easily
known in advance to be [0, len(table)).
No guarantees are made regarding the order in which results are returned. If
you desire an output order that matches the order of the given offsets, you will
need to add the row offset column to the output and align it yourself.
Parameters
----------
offsets: list[int]
The offsets to take.
Returns
-------
pa.RecordBatch
A record batch containing the rows at the given offsets.
"""
def __getitems__(self, offsets: list[int]) -> pa.RecordBatch:
"""
Take a list of offsets from the table and return as a record batch.
This method uses the `take_offsets` method to take the rows. However, it
aligns the offsets to the passed in offsets. This means the return type
is a record batch (and so users should take care not to pass in too many
offsets)
Note: this method is primarily intended to fulfill the Dataset contract
for pytorch.
Parameters
----------
offsets: list[int]
The offsets to take.
Returns
-------
pa.RecordBatch
A record batch containing the rows at the given offsets.
"""
# We don't know the order of the results at all. So we calculate a permutation
# for ordering the given offsets. Then we load the data with the _rowoffset
# column. Then we sort by _rowoffset and apply the inverse of the permutation
# that we calculated.
#
# Note: this is potentially a lot of memory copy if we're operating on large
# batches :(
num_offsets = len(offsets)
indices = list(range(num_offsets))
permutation = sorted(indices, key=lambda idx: offsets[idx])
permutation_inv = [0] * num_offsets
for i in range(num_offsets):
permutation_inv[permutation[i]] = i
columns = self.schema.names
columns.append("_rowoffset")
tbl = (
self.take_offsets(offsets)
.select(columns)
.to_arrow()
.sort_by("_rowoffset")
.take(permutation_inv)
.combine_chunks()
.drop_columns(["_rowoffset"])
)
return tbl
@abstractmethod
def take_row_ids(
self, row_ids: list[int], *, with_row_id: bool = False
) -> LanceTakeQueryBuilder:
"""
Take a list of row ids from the table.
Row ids are not stable and are relative to the current version of the table.
They can change due to compaction and updates.
No guarantees are made regarding the order in which results are returned. If
you desire an output order that matches the order of the given ids, you will
need to add the row id column to the output and align it yourself.
Unlike offsets, row ids are not 0-indexed and no assumptions should be made
about the possible range of row ids. In order to use this method you must
first obtain the row ids by scanning or searching the table.
Even so, row ids are more stable than offsets and can be useful in some
situations.
There is an ongoing effort to make row ids stable which is tracked at
https://github.com/lancedb/lancedb/issues/1120
Parameters
----------
row_ids: list[int]
The row ids to take.
Returns
-------
AsyncTakeQuery
A query object that can be executed to get the rows.
"""
@abstractmethod
def _execute_query(
self,
query: Query,
*,
batch_size: Optional[int] = None,
timeout: Optional[timedelta] = None,
) -> pa.RecordBatchReader: ...
@abstractmethod
def _explain_plan(self, query: Query, verbose: Optional[bool] = False) -> str: ...
@abstractmethod
def _analyze_plan(self, query: Query) -> str: ...
@abstractmethod
def _output_schema(self, query: Query) -> pa.Schema: ...
@abstractmethod
def _do_merge(
self,
merge: LanceMergeInsertBuilder,
new_data: DATA,
on_bad_vectors: OnBadVectorsType,
fill_value: float,
) -> MergeResult: ...
@abstractmethod
def delete(self, where: str) -> DeleteResult:
"""Delete rows from the table.
This can be used to delete a single row, many rows, all rows, or
sometimes no rows (if your predicate matches nothing).
Parameters
----------
where: str
The SQL where clause to use when deleting rows.
- For example, 'x = 2' or 'x IN (1, 2, 3)'.
The filter must not be empty, or it will error.
Returns
-------
DeleteResult
An object containing the new version number of the table after deletion.
Examples
--------
>>> import lancedb
>>> data = [
... {"x": 1, "vector": [1.0, 2]},
... {"x": 2, "vector": [3.0, 4]},
... {"x": 3, "vector": [5.0, 6]}
... ]
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
1 2 [3.0, 4.0]
2 3 [5.0, 6.0]
>>> table.delete("x = 2")
DeleteResult(num_deleted_rows=1, version=2)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
1 3 [5.0, 6.0]
If you have a list of values to delete, you can combine them into a
stringified list and use the `IN` operator:
>>> to_remove = [1, 5]
>>> to_remove = ", ".join([str(v) for v in to_remove])
>>> to_remove
'1, 5'
>>> table.delete(f"x IN ({to_remove})")
DeleteResult(num_deleted_rows=1, version=3)
>>> table.to_pandas()
x vector
0 3 [5.0, 6.0]
"""
raise NotImplementedError
@abstractmethod
def update(
self,
where: Optional[str] = None,
values: Optional[dict] = None,
*,
values_sql: Optional[Dict[str, str]] = None,
) -> UpdateResult:
"""
This can be used to update zero to all rows depending on how many
rows match the where clause. If no where clause is provided, then
all rows will be updated.
Either `values` or `values_sql` must be provided. You cannot provide
both.
Parameters
----------
where: str, optional
The SQL where clause to use when updating rows. For example, 'x = 2'
or 'x IN (1, 2, 3)'. The filter must not be empty, or it will error.
values: dict, optional
The values to update. The keys are the column names and the values
are the values to set.
values_sql: dict, optional
The values to update, expressed as SQL expression strings. These can
reference existing columns. For example, {"x": "x + 1"} will increment
the x column by 1.
Returns
-------
UpdateResult
- rows_updated: The number of rows that were updated
- version: The new version number of the table after the update
Examples
--------
>>> import lancedb
>>> import pandas as pd
>>> data = pd.DataFrame({"x": [1, 2, 3], "vector": [[1.0, 2], [3, 4], [5, 6]]})
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
1 2 [3.0, 4.0]
2 3 [5.0, 6.0]
>>> table.update(where="x = 2", values={"vector": [10.0, 10]})
UpdateResult(rows_updated=1, version=2)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
1 3 [5.0, 6.0]
2 2 [10.0, 10.0]
>>> table.update(values_sql={"x": "x + 1"})
UpdateResult(rows_updated=3, version=3)
>>> table.to_pandas()
x vector
0 2 [1.0, 2.0]
1 4 [5.0, 6.0]
2 3 [10.0, 10.0]
"""
raise NotImplementedError
@abstractmethod
def cleanup_old_versions(
self,
older_than: Optional[timedelta] = None,
*,
delete_unverified: bool = False,
) -> "CleanupStats":
"""
Clean up old versions of the table, freeing disk space.
Parameters
----------
older_than: timedelta, default None
The minimum age of the version to delete. If None, then this defaults
to two weeks.
delete_unverified: bool, default False
Because they may be part of an in-progress transaction, files newer
than 7 days old are not deleted by default. If you are sure that
there are no in-progress transactions, then you can set this to True
to delete all files older than `older_than`.
Returns
-------
CleanupStats
The stats of the cleanup operation, including how many bytes were
freed.
See Also
--------
[Table.optimize][lancedb.table.Table.optimize]: A more comprehensive
optimization operation that includes cleanup as well as other operations.
Notes
-----
This function is not available in LanceDb Cloud (since LanceDB
Cloud manages cleanup for you automatically)
"""
@abstractmethod
def compact_files(self, *args, **kwargs):
"""
Run the compaction process on the table.
This can be run after making several small appends to optimize the table
for faster reads.
Arguments are passed onto Lance's
[compact_files][lance.dataset.DatasetOptimizer.compact_files].
For most cases, the default should be fine.
See Also
--------
[Table.optimize][lancedb.table.Table.optimize]: A more comprehensive
optimization operation that includes cleanup as well as other operations.
Notes
-----
This function is not available in LanceDB Cloud (since LanceDB
Cloud manages compaction for you automatically)
"""
@abstractmethod
def optimize(
self,
*,
cleanup_older_than: Optional[timedelta] = None,
delete_unverified: bool = False,
retrain: bool = False,
):
"""
Optimize the on-disk data and indices for better performance.
Modeled after ``VACUUM`` in PostgreSQL.
Optimization covers three operations:
* Compaction: Merges small files into larger ones
* Prune: Removes old versions of the dataset
* Index: Optimizes the indices, adding new data to existing indices
Parameters
----------
cleanup_older_than: timedelta, optional default 7 days
All files belonging to versions older than this will be removed. Set
to 0 days to remove all versions except the latest. The latest version
is never removed.
delete_unverified: bool, default False
Files leftover from a failed transaction may appear to be part of an
in-progress operation (e.g. appending new data) and these files will not
be deleted unless they are at least 7 days old. If delete_unverified is True
then these files will be deleted regardless of their age.
.. warning::
This should only be set to True if you can guarantee that no other
process is currently working on this dataset. Otherwise the dataset
could be put into a corrupted state.
retrain: bool, default False
This parameter is no longer used and is deprecated.
The frequency an application should call optimize is based on the frequency of
data modifications. If data is frequently added, deleted, or updated then
optimize should be run frequently. A good rule of thumb is to run optimize if
you have added or modified 100,000 or more records or run more than 20 data
modification operations.
"""
@abstractmethod
def list_indices(self) -> Iterable[IndexConfig]:
"""
List all indices that have been created with
[Table.create_index][lancedb.table.Table.create_index]
"""
@abstractmethod
def index_stats(self, index_name: str) -> Optional[IndexStatistics]:
"""
Retrieve statistics about an index
Parameters
----------
index_name: str
The name of the index to retrieve statistics for
Returns
-------
IndexStatistics or None
The statistics about the index. Returns None if the index does not exist.
"""
@abstractmethod
def add_columns(
self, transforms: Dict[str, str] | pa.Field | List[pa.Field] | pa.Schema
):
"""
Add new columns with defined values.
Parameters
----------
transforms: Dict[str, str], pa.Field, List[pa.Field], pa.Schema
A map of column name to a SQL expression to use to calculate the
value of the new column. These expressions will be evaluated for
each row in the table, and can reference existing columns.
Alternatively, a pyarrow Field or Schema can be provided to add
new columns with the specified data types. The new columns will
be initialized with null values.
Returns
-------
AddColumnsResult
version: the new version number of the table after adding columns.
"""
@abstractmethod
def alter_columns(self, *alterations: Iterable[Dict[str, str]]):
"""
Alter column names and nullability.
Parameters
----------
alterations : Iterable[Dict[str, Any]]
A sequence of dictionaries, each with the following keys:
- "path": str
The column path to alter. For a top-level column, this is the name.
For a nested column, this is the dot-separated path, e.g. "a.b.c".
- "rename": str, optional
The new name of the column. If not specified, the column name is
not changed.
- "data_type": pyarrow.DataType, optional
The new data type of the column. Existing values will be casted
to this type. If not specified, the column data type is not changed.
- "nullable": bool, optional
Whether the column should be nullable. If not specified, the column
nullability is not changed. Only non-nullable columns can be changed
to nullable. Currently, you cannot change a nullable column to
non-nullable.
Returns
-------
AlterColumnsResult
version: the new version number of the table after the alteration.
"""
@abstractmethod
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
"""
Drop columns from the table.
Parameters
----------
columns : Iterable[str]
The names of the columns to drop.
Returns
-------
DropColumnsResult
version: the new version number of the table dropping the columns.
"""
@abstractmethod
def checkout(self, version: Union[int, str]):
"""
Checks out a specific version of the Table
Any read operation on the table will now access the data at the checked out
version. As a consequence, calling this method will disable any read consistency
interval that was previously set.
This is a read-only operation that turns the table into a sort of "view"
or "detached head". Other table instances will not be affected. To make the
change permanent you can use the `[Self::restore]` method.
Any operation that modifies the table will fail while the table is in a checked
out state.
Parameters
----------
version: int | str,
The version to check out. A version number (`int`) or a tag
(`str`) can be provided.
To return the table to a normal state use `[Self::checkout_latest]`
"""
@abstractmethod
def checkout_latest(self):
"""
Ensures the table is pointing at the latest version
This can be used to manually update a table when the read_consistency_interval
is None
It can also be used to undo a `[Self::checkout]` operation
"""
@abstractmethod
def restore(self, version: Optional[Union[int, str]] = None):
"""Restore a version of the table. This is an in-place operation.
This creates a new version where the data is equivalent to the
specified previous version. Data is not copied (as of python-v0.2.1).
Parameters
----------
version : int or str, default None
The version number or version tag to restore.
If unspecified then restores the currently checked out version.
If the currently checked out version is the
latest version then this is a no-op.
"""
@abstractmethod
def list_versions(self) -> List[Dict[str, Any]]:
"""List all versions of the table"""
@cached_property
def _dataset_uri(self) -> str:
return _table_uri(self._conn.uri, self.name)
def _get_fts_index_path(self) -> Tuple[str, pa_fs.FileSystem, bool]:
from .remote.table import RemoteTable
if isinstance(self, RemoteTable) or get_uri_scheme(self._dataset_uri) != "file":
return ("", None, False)
path = join_uri(self._dataset_uri, "_indices", "fts")
fs, path = fs_from_uri(path)
index_exists = fs.get_file_info(path).type != pa_fs.FileType.NotFound
return (path, fs, index_exists)
@abstractmethod
def uses_v2_manifest_paths(self) -> bool:
"""
Check if the table is using the new v2 manifest paths.
Returns
-------
bool
True if the table is using the new v2 manifest paths, False otherwise.
"""
@abstractmethod
def migrate_v2_manifest_paths(self):
"""
Migrate the manifest paths to the new format.
This will update the manifest to use the new v2 format for paths.
This function is idempotent, and can be run multiple times without
changing the state of the object store.
!!! danger
This should not be run while other concurrent operations are happening.
And it should also run until completion before resuming other operations.
You can use
[Table.uses_v2_manifest_paths][lancedb.table.Table.uses_v2_manifest_paths]
to check if the table is already using the new path style.
"""
class LanceTable(Table):
"""
A table in a LanceDB database.
This can be opened in two modes: standard and time-travel.
Standard mode is the default. In this mode, the table is mutable and tracks
the latest version of the table. The level of read consistency is controlled
by the `read_consistency_interval` parameter on the connection.
Time-travel mode is activated by specifying a version number. In this mode,
the table is immutable and fixed to a specific version. This is useful for
querying historical versions of the table.
"""
def __init__(
self,
connection: "LanceDBConnection",
name: str,
*,
namespace_path: Optional[List[str]] = None,
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
pushdown_operations: Optional[set] = None,
_async: AsyncTable = None,
):
if namespace_path is None:
namespace_path = []
self._conn = connection
self._namespace_path = namespace_path
self._location = location # Store location for use in _dataset_path
self._namespace_client = namespace_client
self._pushdown_operations = pushdown_operations or set()
if _async is not None:
self._table = _async
else:
self._table = LOOP.run(
connection._conn.open_table(
name,
namespace_path=namespace_path,
storage_options=storage_options,
index_cache_size=index_cache_size,
location=location,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
)
)
@property
def name(self) -> str:
return self._table.name
@property
def namespace(self) -> List[str]:
"""Return the namespace path of the table."""
return self._namespace_path
@property
def id(self) -> str:
"""Return the full identifier of the table (namespace$name)."""
if self._namespace_path:
return "$".join(self._namespace_path + [self.name])
return self.name
@classmethod
def from_inner(cls, tbl: LanceDBTable):
from .db import LanceDBConnection
async_tbl = AsyncTable(tbl)
conn = LanceDBConnection.from_inner(tbl.database())
return cls(
conn,
async_tbl.name,
_async=async_tbl,
)
@classmethod
def open(
cls,
db,
name,
*,
namespace_path: Optional[List[str]] = None,
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
pushdown_operations: Optional[set] = None,
):
if namespace_path is None:
namespace_path = []
tbl = cls(
db,
name,
namespace_path=namespace_path,
storage_options=storage_options,
index_cache_size=index_cache_size,
location=location,
namespace_client=namespace_client,
managed_versioning=managed_versioning,
pushdown_operations=pushdown_operations,
)
# check the dataset exists
try:
tbl.version
except ValueError as e:
if "Not found:" in str(e):
raise FileNotFoundError(f"Table {name} does not exist")
raise e
return tbl
@cached_property
def _dataset_path(self) -> str:
# Cacheable since it's deterministic
# If table was opened with explicit location (e.g., from namespace),
# use that location directly instead of constructing from base URI
if self._location is not None:
return self._location
return _table_path(self._conn.uri, self.name)
def to_lance(self, **kwargs) -> lance.LanceDataset:
"""Return the LanceDataset backing this table."""
try:
import lance
except ImportError:
raise ImportError(
"The lance library is required to use this function. "
"Please install with `pip install pylance`."
)
if self._namespace_client is not None:
table_id = self._namespace_path + [self.name]
return lance.dataset(
version=self.version,
storage_options=self._conn.storage_options,
namespace_client=self._namespace_client,
table_id=table_id,
**kwargs,
)
return lance.dataset(
self._dataset_path,
version=self.version,
storage_options=self._conn.storage_options,
**kwargs,
)
@property
def schema(self) -> pa.Schema:
"""Return the schema of the table.
Returns
-------
pa.Schema
A PyArrow schema object."""
return LOOP.run(self._table.schema())
def list_versions(self) -> List[Dict[str, Any]]:
"""List all versions of the table"""
return LOOP.run(self._table.list_versions())
@property
def version(self) -> int:
"""Get the current version of the table"""
return LOOP.run(self._table.version())
def take_offsets(self, offsets: list[int]) -> LanceTakeQueryBuilder:
return LanceTakeQueryBuilder(self._table.take_offsets(offsets))
def take_row_ids(self, row_ids: list[int]) -> LanceTakeQueryBuilder:
return LanceTakeQueryBuilder(self._table.take_row_ids(row_ids))
@property
def tags(self) -> Tags:
"""Tag management for the table.
Similar to Git, tags are a way to add metadata to a specific version of the
table.
.. warning::
Tagged versions are exempted from the :py:meth:`cleanup_old_versions()`
process.
To remove a version that has been tagged, you must first
:py:meth:`~Tags.delete` the associated tag.
Returns
-------
Tags
The tag manager for managing tags for the table.
Examples
--------
>>> import lancedb
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table",
... [{"vector": [1.1, 0.9], "type": "vector"}])
>>> table.tags.create("v1", table.version)
>>> table.add([{"vector": [0.5, 0.2], "type": "vector"}])
AddResult(version=2)
>>> tags = table.tags.list()
>>> print(tags["v1"]["version"])
1
>>> table.checkout("v1")
>>> table.to_pandas()
vector type
0 [1.1, 0.9] vector
"""
return Tags(self._table)
def checkout(self, version: Union[int, str]):
"""Checkout a version of the table. This is an in-place operation.
This allows viewing previous versions of the table. If you wish to
keep writing to the dataset starting from an old version, then use
the `restore` function.
Calling this method will set the table into time-travel mode. If you
wish to return to standard mode, call `checkout_latest`.
Parameters
----------
version: int | str,
The version to check out. A version number (`int`) or a tag
(`str`) can be provided.
Examples
--------
>>> import lancedb
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table",
... [{"vector": [1.1, 0.9], "type": "vector"}])
>>> table.version
1
>>> table.to_pandas()
vector type
0 [1.1, 0.9] vector
>>> table.add([{"vector": [0.5, 0.2], "type": "vector"}])
AddResult(version=2)
>>> table.version
2
>>> table.checkout(1)
>>> table.to_pandas()
vector type
0 [1.1, 0.9] vector
"""
LOOP.run(self._table.checkout(version))
def checkout_latest(self):
"""Checkout the latest version of the table. This is an in-place operation.
The table will be set back into standard mode, and will track the latest
version of the table.
"""
LOOP.run(self._table.checkout_latest())
def restore(self, version: Optional[Union[int, str]] = None):
"""Restore a version of the table. This is an in-place operation.
This creates a new version where the data is equivalent to the
specified previous version. Data is not copied (as of python-v0.2.1).
Parameters
----------
version : int or str, default None
The version number or version tag to restore.
If unspecified then restores the currently checked out version.
If the currently checked out version is the
latest version then this is a no-op.
Examples
--------
>>> import lancedb
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", [
... {"vector": [1.1, 0.9], "type": "vector"}])
>>> table.version
1
>>> table.to_pandas()
vector type
0 [1.1, 0.9] vector
>>> table.add([{"vector": [0.5, 0.2], "type": "vector"}])
AddResult(version=2)
>>> table.version
2
>>> table.tags.create("v2", 2)
>>> table.restore(1)
>>> table.to_pandas()
vector type
0 [1.1, 0.9] vector
>>> len(table.list_versions())
3
>>> table.restore("v2")
>>> table.to_pandas()
vector type
0 [1.1, 0.9] vector
1 [0.5, 0.2] vector
>>> len(table.list_versions())
4
"""
if version is not None:
LOOP.run(self._table.checkout(version))
LOOP.run(self._table.restore())
def count_rows(self, filter: Optional[str] = None) -> int:
return LOOP.run(self._table.count_rows(filter))
def __repr__(self) -> str:
val = f"{self.__class__.__name__}(name={self.name!r}, version={self.version}"
if self._conn.read_consistency_interval is not None:
val += ", read_consistency_interval={!r}".format(
self._conn.read_consistency_interval
)
val += f", _conn={self._conn!r})"
return val
def __str__(self) -> str:
return self.__repr__()
def head(self, n=5) -> pa.Table:
"""Return the first n rows of the table."""
return LOOP.run(self._table.head(n))
def to_pandas(self) -> "pd.DataFrame":
"""Return the table as a pandas DataFrame.
Returns
-------
pd.DataFrame
"""
return self.to_arrow().to_pandas()
def to_arrow(self) -> pa.Table:
"""Return the table as a pyarrow Table.
Returns
-------
pa.Table"""
return LOOP.run(self._table.to_arrow())
def to_polars(self, batch_size=None) -> "pl.LazyFrame":
"""Return the table as a polars LazyFrame.
Parameters
----------
batch_size: int, optional
Passed to polars. This is the maximum row count for
scanned pyarrow record batches
Note
----
1. This requires polars to be installed separately
2. Currently we've disabled push-down of the filters from polars
because polars pushdown into pyarrow uses pyarrow compute
expressions rather than SQl strings (which LanceDB supports)
Returns
-------
pl.LazyFrame
"""
from lancedb.integrations.pyarrow import PyarrowDatasetAdapter
dataset = PyarrowDatasetAdapter(self)
return pl.scan_pyarrow_dataset(
dataset, allow_pyarrow_filter=False, batch_size=batch_size
)
def create_index(
self,
metric: DistanceType = "l2",
num_partitions=None,
num_sub_vectors=None,
vector_column_name: str = VECTOR_COLUMN_NAME,
replace: bool = True,
accelerator: Optional[str] = None,
index_cache_size: Optional[int] = None,
num_bits: int = 8,
index_type: Literal[
"IVF_FLAT", "IVF_SQ", "IVF_PQ", "IVF_RQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"
] = "IVF_PQ",
max_iterations: int = 50,
sample_rate: int = 256,
m: int = 20,
ef_construction: int = 300,
*,
name: Optional[str] = None,
train: bool = True,
target_partition_size: Optional[int] = None,
):
"""Create an index on the table."""
if accelerator is not None:
# accelerator is only supported through pylance.
self.to_lance().create_index(
column=vector_column_name,
index_type=index_type,
metric=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
replace=replace,
accelerator=accelerator,
index_cache_size=index_cache_size,
num_bits=num_bits,
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
)
self.checkout_latest()
return
elif index_type == "IVF_FLAT":
config = IvfFlat(
distance_type=metric,
num_partitions=num_partitions,
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
)
elif index_type == "IVF_SQ":
config = IvfSq(
distance_type=metric,
num_partitions=num_partitions,
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
)
elif index_type == "IVF_PQ":
config = IvfPq(
distance_type=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
num_bits=num_bits,
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
)
elif index_type == "IVF_RQ":
config = IvfRq(
distance_type=metric,
num_partitions=num_partitions,
num_bits=num_bits,
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
)
elif index_type == "IVF_HNSW_PQ":
config = HnswPq(
distance_type=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
num_bits=num_bits,
max_iterations=max_iterations,
sample_rate=sample_rate,
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
)
elif index_type == "IVF_HNSW_SQ":
config = HnswSq(
distance_type=metric,
num_partitions=num_partitions,
max_iterations=max_iterations,
sample_rate=sample_rate,
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
)
else:
raise ValueError(f"Unknown index type {index_type}")
return LOOP.run(
self._table.create_index(
vector_column_name,
replace=replace,
config=config,
name=name,
train=train,
)
)
def drop_index(self, name: str) -> None:
"""
Drops an index from the table
Parameters
----------
name: str
The name of the index to drop
"""
return LOOP.run(self._table.drop_index(name))
def prewarm_index(self, name: str) -> None:
"""
Prewarm an index in the table.
This is a hint to the database that the index will be accessed in the
future and should be loaded into memory if possible. This can reduce
cold-start latency for subsequent queries.
This call initiates prewarming and returns once the request is accepted.
It is idempotent and safe to call from multiple clients concurrently.
It is generally wasteful to call this if the index does not fit into the
available cache. Not all index types support prewarming; unsupported
indices will silently ignore the request.
Parameters
----------
name: str
The name of the index to prewarm
"""
return LOOP.run(self._table.prewarm_index(name))
def prewarm_data(self, columns: Optional[List[str]] = None) -> None:
"""
Prewarm data for the table.
This is a hint to the database that the given columns will be accessed
in the future and the database should prefetch the data if possible.
Currently only supported on remote tables.
This call initiates prewarming and returns once the request is accepted.
It is idempotent and safe to call from multiple clients concurrently.
This operation has a large upfront cost but can speed up future queries
that need to fetch the given columns. Large columns such as embeddings
or binary data may not be practical to prewarm. This feature is intended
for workloads that issue many queries against the same columns.
Parameters
----------
columns: list of str, optional
The columns to prewarm. If None, all columns are prewarmed.
"""
return LOOP.run(self._table.prewarm_data(columns))
def wait_for_index(
self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300)
) -> None:
return LOOP.run(self._table.wait_for_index(index_names, timeout))
def stats(self) -> TableStatistics:
return LOOP.run(self._table.stats())
@property
def uri(self) -> str:
return LOOP.run(self._table.uri())
def initial_storage_options(self) -> Optional[Dict[str, str]]:
"""Get the initial storage options that were passed in when opening this table.
For dynamically refreshed options (e.g., credential vending), use
:meth:`latest_storage_options`.
Warning: This is an internal API and the return value is subject to change.
Returns
-------
Optional[Dict[str, str]]
The storage options, or None if no storage options were configured.
"""
return LOOP.run(self._table.initial_storage_options())
def latest_storage_options(self) -> Optional[Dict[str, str]]:
"""Get the latest storage options, refreshing from provider if configured.
This method is useful for credential vending scenarios where storage options
may be refreshed dynamically. If no dynamic provider is configured, this
returns the initial static options.
Warning: This is an internal API and the return value is subject to change.
Returns
-------
Optional[Dict[str, str]]
The storage options, or None if no storage options were configured.
"""
return LOOP.run(self._table.latest_storage_options())
def create_scalar_index(
self,
column: str,
*,
replace: bool = True,
index_type: ScalarIndexType = "BTREE",
name: Optional[str] = None,
):
if index_type == "BTREE":
config = BTree()
elif index_type == "BITMAP":
config = Bitmap()
elif index_type == "LABEL_LIST":
config = LabelList()
else:
raise ValueError(f"Unknown index type {index_type}")
return LOOP.run(
self._table.create_index(column, replace=replace, config=config, name=name)
)
def create_fts_index(
self,
field_names: Union[str, List[str]],
*,
ordering_field_names: Optional[Union[str, List[str]]] = None,
replace: bool = False,
writer_heap_size: Optional[int] = 1024 * 1024 * 1024,
use_tantivy: bool = False,
tokenizer_name: Optional[str] = None,
with_position: bool = False,
# tokenizer configs:
base_tokenizer: BaseTokenizerType = "simple",
language: str = "English",
max_token_length: Optional[int] = 40,
lower_case: bool = True,
stem: bool = True,
remove_stop_words: bool = True,
ascii_folding: bool = True,
ngram_min_length: int = 3,
ngram_max_length: int = 3,
prefix_only: bool = False,
name: Optional[str] = None,
):
if not use_tantivy:
if not isinstance(field_names, str):
raise ValueError(
"Native FTS indexes can only be created on a single field "
"at a time. To search over multiple text fields, create a "
"separate FTS index for each field."
)
if tokenizer_name is None:
tokenizer_configs = {
"base_tokenizer": base_tokenizer,
"language": language,
"with_position": with_position,
"max_token_length": max_token_length,
"lower_case": lower_case,
"stem": stem,
"remove_stop_words": remove_stop_words,
"ascii_folding": ascii_folding,
"ngram_min_length": ngram_min_length,
"ngram_max_length": ngram_max_length,
"prefix_only": prefix_only,
}
else:
tokenizer_configs = self.infer_tokenizer_configs(tokenizer_name)
config = FTS(
**tokenizer_configs,
)
# delete the existing legacy index if it exists
if replace:
path, fs, exist = self._get_fts_index_path()
if exist:
fs.delete_dir(path)
LOOP.run(
self._table.create_index(
field_names,
replace=replace,
config=config,
name=name,
)
)
return
from .fts import create_index, populate_index
if isinstance(field_names, str):
field_names = [field_names]
if isinstance(ordering_field_names, str):
ordering_field_names = [ordering_field_names]
path, fs, exist = self._get_fts_index_path()
if exist:
if not replace:
raise ValueError("Index already exists. Use replace=True to overwrite.")
fs.delete_dir(path)
if not isinstance(fs, pa_fs.LocalFileSystem):
raise NotImplementedError(
"Full-text search is only supported on the local filesystem"
)
if tokenizer_name is None:
tokenizer_name = "default"
index = create_index(
path,
field_names,
ordering_fields=ordering_field_names,
tokenizer_name=tokenizer_name,
)
populate_index(
index,
self,
field_names,
ordering_fields=ordering_field_names,
writer_heap_size=writer_heap_size,
)
@staticmethod
def infer_tokenizer_configs(tokenizer_name: str) -> dict:
if tokenizer_name == "default":
return {
"base_tokenizer": "simple",
"language": "English",
"max_token_length": 40,
"lower_case": True,
"stem": False,
"remove_stop_words": False,
"ascii_folding": False,
"ngram_min_length": 3,
"ngram_max_length": 3,
"prefix_only": False,
}
elif tokenizer_name == "raw":
return {
"base_tokenizer": "raw",
"language": "English",
"max_token_length": None,
"lower_case": False,
"stem": False,
"remove_stop_words": False,
"ascii_folding": False,
"ngram_min_length": 3,
"ngram_max_length": 3,
"prefix_only": False,
}
elif tokenizer_name == "whitespace":
return {
"base_tokenizer": "whitespace",
"language": "English",
"max_token_length": None,
"lower_case": False,
"stem": False,
"remove_stop_words": False,
"ascii_folding": False,
"ngram_min_length": 3,
"ngram_max_length": 3,
"prefix_only": False,
}
# or it's with language stemming with pattern like "en_stem"
if len(tokenizer_name) != 7:
raise ValueError(f"Invalid tokenizer name {tokenizer_name}")
lang = tokenizer_name[:2]
if tokenizer_name[-5:] != "_stem":
raise ValueError(f"Invalid tokenizer name {tokenizer_name}")
if lang not in lang_mapping:
raise ValueError(f"Invalid language code {lang}")
return {
"base_tokenizer": "simple",
"language": lang_mapping[lang],
"max_token_length": 40,
"lower_case": True,
"stem": True,
"remove_stop_words": False,
"ascii_folding": False,
"ngram_min_length": 3,
"ngram_max_length": 3,
"prefix_only": False,
}
def add(
self,
data: DATA,
mode: AddMode = "append",
on_bad_vectors: OnBadVectorsType = "error",
fill_value: float = 0.0,
progress: Optional[Union[bool, Callable, Any]] = None,
) -> AddResult:
"""Add data to the table.
If vector columns are missing and the table
has embedding functions, then the vector columns
are automatically computed and added.
Parameters
----------
data: list-of-dict, pd.DataFrame
The data to insert into the table.
mode: str
The mode to use when writing the data. Valid values are
"append" and "overwrite".
on_bad_vectors: str, default "error"
What to do if any of the vectors are not the same size or contains NaNs.
One of "error", "drop", "fill", "null".
fill_value: float, default 0.
The value to use when filling vectors. Only used if on_bad_vectors="fill".
progress: bool, callable, or tqdm-like, optional
A callback or tqdm-compatible progress bar. See
:meth:`Table.add` for details.
Returns
-------
int
The number of vectors in the table.
"""
progress, owns = _normalize_progress(progress)
try:
return LOOP.run(
self._table.add(
data,
mode=mode,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
progress=progress,
)
)
finally:
if owns:
progress.close()
def merge(
self,
other_table: Union[LanceTable, DATA],
left_on: str,
right_on: Optional[str] = None,
schema: Optional[Union[pa.Schema, LanceModel]] = None,
):
"""Merge another table into this table.
Performs a left join, where the dataset is the left side and other_table
is the right side. Rows existing in the dataset but not on the left will
be filled with null values, unless Lance doesn't support null values for
some types, in which case an error will be raised. The only overlapping
column allowed is the join column. If other overlapping columns exist,
an error will be raised.
Parameters
----------
other_table: LanceTable or Reader-like
The data to be merged. Acceptable types are:
- Pandas DataFrame, Pyarrow Table, Dataset, Scanner,
Iterator[RecordBatch], or RecordBatchReader
- LanceTable
left_on: str
The name of the column in the dataset to join on.
right_on: str or None
The name of the column in other_table to join on. If None, defaults to
left_on.
schema: pa.Schema or LanceModel, optional
The schema of the other_table.
If not provided, the schema is inferred from the data.
Examples
--------
>>> import lancedb
>>> import pyarrow as pa
>>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']})
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("dataset", df)
>>> table.to_pandas()
x y
0 1 a
1 2 b
2 3 c
>>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']})
>>> table.merge(new_df, 'x')
>>> table.to_pandas()
x y z
0 1 a d
1 2 b e
2 3 c f
"""
if isinstance(other_table, LanceTable):
other_table = other_table.to_lance()
else:
other_table = _sanitize_data(
other_table,
schema,
)
self.to_lance().merge(
other_table, left_on=left_on, right_on=right_on, schema=schema
)
self.checkout_latest()
@cached_property
def embedding_functions(self) -> Dict[str, EmbeddingFunctionConfig]:
"""
Get the embedding functions for the table
Returns
-------
funcs: Dict[str, EmbeddingFunctionConfig]
A mapping of the vector column to the embedding function
or empty dict if not configured.
"""
return EmbeddingFunctionRegistry.get_instance().parse_functions(
self.schema.metadata
)
@overload
def search( # type: ignore
self,
query: Optional[Union[VEC, str, "PIL.Image.Image", Tuple]] = None,
vector_column_name: Optional[str] = None,
query_type: Literal["vector"] = "vector",
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> LanceVectorQueryBuilder: ...
@overload
def search(
self,
query: Optional[Union[VEC, str, "PIL.Image.Image", Tuple]] = None,
vector_column_name: Optional[str] = None,
query_type: Literal["fts"] = "fts",
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> LanceFtsQueryBuilder: ...
@overload
def search(
self,
query: Optional[
Union[VEC, str, "PIL.Image.Image", Tuple, FullTextQuery]
] = None,
vector_column_name: Optional[str] = None,
query_type: Literal["hybrid"] = "hybrid",
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> LanceHybridQueryBuilder: ...
@overload
def search(
self,
query: None = None,
vector_column_name: Optional[str] = None,
query_type: QueryType = "auto",
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> LanceEmptyQueryBuilder: ...
def search(
self,
query: Optional[
Union[VEC, str, "PIL.Image.Image", Tuple, FullTextQuery]
] = None,
vector_column_name: Optional[str] = None,
query_type: QueryType = "auto",
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> LanceQueryBuilder:
"""Create a search query to find the nearest neighbors
of the given query vector. We currently support [vector search][search]
and [full-text search][search].
Examples
--------
>>> import lancedb
>>> db = lancedb.connect("./.lancedb")
>>> data = [
... {"original_width": 100, "caption": "bar", "vector": [0.1, 2.3, 4.5]},
... {"original_width": 2000, "caption": "foo", "vector": [0.5, 3.4, 1.3]},
... {"original_width": 3000, "caption": "test", "vector": [0.3, 6.2, 2.6]}
... ]
>>> table = db.create_table("my_table", data)
>>> query = [0.4, 1.4, 2.4]
>>> (table.search(query)
... .where("original_width > 1000", prefilter=True)
... .select(["caption", "original_width", "vector"])
... .limit(2)
... .to_pandas())
caption original_width vector _distance
0 foo 2000 [0.5, 3.4, 1.3] 5.220000
1 test 3000 [0.3, 6.2, 2.6] 23.089996
Parameters
----------
query: list/np.ndarray/str/PIL.Image.Image, default None
The targetted vector to search for.
- *default None*.
Acceptable types are: list, np.ndarray, PIL.Image.Image
- If None then the select/[where][sql]/limit clauses are applied
to filter the table
vector_column_name: str, optional
The name of the vector column to search.
The vector column needs to be a pyarrow fixed size list type
*default "vector"*
- If not specified then the vector column is inferred from
the table schema
- If the table has multiple vector columns then the *vector_column_name*
needs to be specified. Otherwise, an error is raised.
query_type: str, default "auto"
"vector", "fts", or "auto"
If "auto" then the query type is inferred from the query;
If `query` is a list/np.ndarray then the query type is "vector";
If `query` is a PIL.Image.Image then either do vector search
or raise an error if no corresponding embedding function is found.
If the `query` is a string, then the query type is "vector" if the
table has embedding functions, else the query type is "fts"
fts_columns: str or list of str, default None
The column(s) to search in for full-text search.
If None then the search is performed on all indexed columns.
For now, only one column can be searched at a time.
Returns
-------
LanceQueryBuilder
A query builder object representing the query.
Once executed, the query returns selected columns, the vector,
and also the "_distance" column which is the distance between the query
vector and the returned vector.
"""
if isinstance(query, FullTextQuery):
query_type = "fts"
vector_column_name = infer_vector_column_name(
schema=self.schema,
query_type=query_type,
query=query,
vector_column_name=vector_column_name,
)
return LanceQueryBuilder.create(
self,
query,
query_type,
vector_column_name=vector_column_name,
ordering_field_name=ordering_field_name,
fts_columns=fts_columns or [],
)
@classmethod
def create(
cls,
db: LanceDBConnection,
name: str,
data: Optional[DATA] = None,
schema: Optional[pa.Schema] = None,
mode: CreateMode = "create",
exist_ok: bool = False,
on_bad_vectors: OnBadVectorsType = "error",
fill_value: float = 0.0,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
namespace_path: Optional[List[str]] = None,
storage_options: Optional[Dict[str, str | bool]] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
pushdown_operations: Optional[set] = None,
):
"""
Create a new table.
Examples
--------
>>> import lancedb
>>> data = [
... {"x": 1, "vector": [1.0, 2]},
... {"x": 2, "vector": [3.0, 4]},
... {"x": 3, "vector": [5.0, 6]}
... ]
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
1 2 [3.0, 4.0]
2 3 [5.0, 6.0]
Parameters
----------
db: LanceDB
The LanceDB instance to create the table in.
name: str
The name of the table to create.
data: list-of-dict, dict, pd.DataFrame, default None
The data to insert into the table.
At least one of `data` or `schema` must be provided.
schema: pa.Schema or LanceModel, optional
The schema of the table. If not provided,
the schema is inferred from the data.
At least one of `data` or `schema` must be provided.
mode: str, default "create"
The mode to use when writing the data. Valid values are
"create", "overwrite", and "append".
exist_ok: bool, default False
If the table already exists then raise an error if False,
otherwise just open the table, it will not add the provided
data but will validate against any schema that's specified.
on_bad_vectors: str, default "error"
What to do if any of the vectors are not the same size or contains NaNs.
One of "error", "drop", "fill", "null".
fill_value: float, default 0.
The value to use when filling vectors. Only used if on_bad_vectors="fill".
embedding_functions: list of EmbeddingFunctionModel, default None
The embedding functions to use when creating the table.
data_storage_version: optional, str, default "stable"
Deprecated. Set `storage_options` when connecting to the database and set
`new_table_data_storage_version` in the options.
enable_v2_manifest_paths: optional, bool, default False
Deprecated. Set `storage_options` when connecting to the database and set
`new_table_enable_v2_manifest_paths` in the options.
"""
if namespace_path is None:
namespace_path = []
self = cls.__new__(cls)
self._conn = db
self._namespace_path = namespace_path
self._location = location
self._namespace_client = namespace_client
self._pushdown_operations = pushdown_operations or set()
if data_storage_version is not None:
warnings.warn(
"setting data_storage_version directly on create_table is deprecated. ",
"Use database_options instead.",
DeprecationWarning,
)
if storage_options is None:
storage_options = {}
storage_options["new_table_data_storage_version"] = data_storage_version
if enable_v2_manifest_paths is not None:
warnings.warn(
"setting enable_v2_manifest_paths directly on create_table is ",
"deprecated. Use database_options instead.",
DeprecationWarning,
)
if storage_options is None:
storage_options = {}
storage_options["new_table_enable_v2_manifest_paths"] = (
enable_v2_manifest_paths
)
self._table = LOOP.run(
self._conn._conn.create_table(
name,
data,
schema=schema,
mode=mode,
exist_ok=exist_ok,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
embedding_functions=embedding_functions,
namespace_path=namespace_path,
storage_options=storage_options,
location=location,
)
)
return self
def delete(self, where: str) -> DeleteResult:
return LOOP.run(self._table.delete(where))
def update(
self,
where: Optional[str] = None,
values: Optional[dict] = None,
*,
values_sql: Optional[Dict[str, str]] = None,
) -> UpdateResult:
"""
This can be used to update zero to all rows depending on how many
rows match the where clause.
Parameters
----------
where: str, optional
The SQL where clause to use when updating rows. For example, 'x = 2'
or 'x IN (1, 2, 3)'. The filter must not be empty, or it will error.
values: dict, optional
The values to update. The keys are the column names and the values
are the values to set.
values_sql: dict, optional
The values to update, expressed as SQL expression strings. These can
reference existing columns. For example, {"x": "x + 1"} will increment
the x column by 1.
Returns
-------
UpdateResult
- rows_updated: The number of rows that were updated
- version: The new version number of the table after the update
Examples
--------
>>> import lancedb
>>> import pandas as pd
>>> data = pd.DataFrame({"x": [1, 2, 3], "vector": [[1.0, 2], [3, 4], [5, 6]]})
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
1 2 [3.0, 4.0]
2 3 [5.0, 6.0]
>>> table.update(where="x = 2", values={"vector": [10.0, 10]})
UpdateResult(rows_updated=1, version=2)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
1 3 [5.0, 6.0]
2 2 [10.0, 10.0]
"""
return LOOP.run(self._table.update(values, where=where, updates_sql=values_sql))
def _execute_query(
self,
query: Query,
*,
batch_size: Optional[int] = None,
timeout: Optional[timedelta] = None,
) -> pa.RecordBatchReader:
if (
"QueryTable" in self._pushdown_operations
and self._namespace_client is not None
):
from lancedb.namespace import _execute_server_side_query
table_id = self._namespace_path + [self.name]
return _execute_server_side_query(self._namespace_client, table_id, query)
async_iter = LOOP.run(
self._table._execute_query(query, batch_size=batch_size, timeout=timeout)
)
def iter_sync():
try:
while True:
yield LOOP.run(async_iter.__anext__())
except StopAsyncIteration:
return
return pa.RecordBatchReader.from_batches(async_iter.schema, iter_sync())
def _explain_plan(self, query: Query, verbose: Optional[bool] = False) -> str:
return LOOP.run(self._table._explain_plan(query, verbose))
def _analyze_plan(self, query: Query) -> str:
return LOOP.run(self._table._analyze_plan(query))
def _output_schema(self, query: Query) -> pa.Schema:
return LOOP.run(self._table._output_schema(query))
def _do_merge(
self,
merge: LanceMergeInsertBuilder,
new_data: DATA,
on_bad_vectors: OnBadVectorsType,
fill_value: float,
) -> MergeResult:
return LOOP.run(
self._table._do_merge(merge, new_data, on_bad_vectors, fill_value)
)
@property
def _inner(self) -> LanceDBTable:
return self._table._inner
@deprecation.deprecated(
deprecated_in="0.21.0",
current_version=__version__,
details="Use `Table.optimize` instead.",
)
def cleanup_old_versions(
self,
older_than: Optional[timedelta] = None,
*,
delete_unverified: bool = False,
) -> "CleanupStats":
"""
Clean up old versions of the table, freeing disk space.
Parameters
----------
older_than: timedelta, default None
The minimum age of the version to delete. If None, then this defaults
to two weeks.
delete_unverified: bool, default False
Because they may be part of an in-progress transaction, files newer
than 7 days old are not deleted by default. If you are sure that
there are no in-progress transactions, then you can set this to True
to delete all files older than `older_than`.
Returns
-------
CleanupStats
The stats of the cleanup operation, including how many bytes were
freed.
"""
return self.to_lance().cleanup_old_versions(
older_than, delete_unverified=delete_unverified
)
@deprecation.deprecated(
deprecated_in="0.21.0",
current_version=__version__,
details="Use `Table.optimize` instead.",
)
def compact_files(self, *args, **kwargs) -> CompactionStats:
"""
Run the compaction process on the table.
This can be run after making several small appends to optimize the table
for faster reads.
Arguments are passed onto `lance.dataset.DatasetOptimizer.compact_files`.
(see Lance documentation for more details) For most cases, the default
should be fine.
"""
stats = self.to_lance().optimize.compact_files(*args, **kwargs)
self.checkout_latest()
return stats
def optimize(
self,
*,
cleanup_older_than: Optional[timedelta] = None,
delete_unverified: bool = False,
retrain: bool = False,
):
"""
Optimize the on-disk data and indices for better performance.
Modeled after ``VACUUM`` in PostgreSQL.
Optimization covers three operations:
* Compaction: Merges small files into larger ones
* Prune: Removes old versions of the dataset
* Index: Optimizes the indices, adding new data to existing indices
Parameters
----------
cleanup_older_than: timedelta, optional default 7 days
All files belonging to versions older than this will be removed. Set
to 0 days to remove all versions except the latest. The latest version
is never removed.
delete_unverified: bool, default False
Files leftover from a failed transaction may appear to be part of an
in-progress operation (e.g. appending new data) and these files will not
be deleted unless they are at least 7 days old. If delete_unverified is True
then these files will be deleted regardless of their age.
.. warning::
This should only be set to True if you can guarantee that no other
process is currently working on this dataset. Otherwise the dataset
could be put into a corrupted state.
retrain: bool, default False
This parameter is no longer used and is deprecated.
The frequency an application should call optimize is based on the frequency of
data modifications. If data is frequently added, deleted, or updated then
optimize should be run frequently. A good rule of thumb is to run optimize if
you have added or modified 100,000 or more records or run more than 20 data
modification operations.
"""
LOOP.run(
self._table.optimize(
cleanup_older_than=cleanup_older_than,
delete_unverified=delete_unverified,
retrain=retrain,
)
)
def list_indices(self) -> Iterable[IndexConfig]:
"""
List all indices that have been created with Self::create_index
"""
return LOOP.run(self._table.list_indices())
def index_stats(self, index_name: str) -> Optional[IndexStatistics]:
"""
Retrieve statistics about an index
Parameters
----------
index_name: str
The name of the index to retrieve statistics for
Returns
-------
IndexStatistics or None
The statistics about the index. Returns None if the index does not exist.
"""
return LOOP.run(self._table.index_stats(index_name))
def add_columns(
self, transforms: Dict[str, str] | pa.field | List[pa.field] | pa.Schema
) -> AddColumnsResult:
return LOOP.run(self._table.add_columns(transforms))
def alter_columns(
self, *alterations: Iterable[Dict[str, str]]
) -> AlterColumnsResult:
return LOOP.run(self._table.alter_columns(*alterations))
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
return LOOP.run(self._table.drop_columns(columns))
def uses_v2_manifest_paths(self) -> bool:
"""
Check if the table is using the new v2 manifest paths.
Returns
-------
bool
True if the table is using the new v2 manifest paths, False otherwise.
"""
return LOOP.run(self._table.uses_v2_manifest_paths())
def migrate_v2_manifest_paths(self):
"""
Migrate the manifest paths to the new format.
This will update the manifest to use the new v2 format for paths.
This function is idempotent, and can be run multiple times without
changing the state of the object store.
!!! danger
This should not be run while other concurrent operations are happening.
And it should also run until completion before resuming other operations.
You can use
[LanceTable.uses_v2_manifest_paths][lancedb.table.LanceTable.uses_v2_manifest_paths]
to check if the table is already using the new path style.
"""
LOOP.run(self._table.migrate_v2_manifest_paths())
def replace_field_metadata(self, field_name: str, new_metadata: Dict[str, str]):
"""
Replace the metadata of a field in the schema
Parameters
----------
field_name: str
The name of the field to replace the metadata for
new_metadata: dict
The new metadata to set
"""
LOOP.run(self._table.replace_field_metadata(field_name, new_metadata))
def _handle_bad_vectors(
reader: pa.RecordBatchReader,
on_bad_vectors: Literal["error", "drop", "fill", "null"] = "error",
fill_value: float = 0.0,
target_schema: Optional[pa.Schema] = None,
metadata: Optional[dict] = None,
) -> pa.RecordBatchReader:
vector_columns = _find_vector_columns(reader.schema, target_schema, metadata)
if not vector_columns:
return reader
output_schema = _vector_output_schema(reader.schema, vector_columns)
def gen():
for batch in reader:
pending_dims = []
for vector_column in vector_columns:
dim = vector_column["expected_dim"]
if target_schema is not None and dim is None:
dim = _infer_vector_dim(batch[vector_column["name"]])
pending_dims.append(vector_column)
batch = _handle_bad_vector_column(
batch,
vector_column_name=vector_column["name"],
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
expected_dim=dim,
expected_value_type=vector_column["expected_value_type"],
)
for vector_column in pending_dims:
if vector_column["expected_dim"] is None:
vector_column["expected_dim"] = _infer_vector_dim(
batch[vector_column["name"]]
)
if batch.schema.equals(output_schema, check_metadata=True):
yield batch
continue
cast_batches = (
pa.Table.from_batches([batch]).cast(output_schema).to_batches()
)
if cast_batches:
yield pa.RecordBatch.from_arrays(
cast_batches[0].columns,
schema=output_schema,
)
return pa.RecordBatchReader.from_batches(output_schema, gen())
def _find_vector_columns(
reader_schema: pa.Schema,
target_schema: Optional[pa.Schema],
metadata: Optional[dict],
) -> List[dict]:
if target_schema is None:
vector_columns = []
for field in reader_schema:
named_vector_col = (
_is_list_like(field.type)
and pa.types.is_floating(field.type.value_type)
and field.name == VECTOR_COLUMN_NAME
)
likely_vector_col = (
pa.types.is_fixed_size_list(field.type)
and pa.types.is_floating(field.type.value_type)
and (field.type.list_size >= 10)
)
if named_vector_col or likely_vector_col:
vector_columns.append(
{
"name": field.name,
"expected_dim": None,
"expected_value_type": None,
}
)
return vector_columns
reader_column_names = set(reader_schema.names)
active_metadata = _merge_metadata(target_schema.metadata, metadata)
embedding_function_columns = set(
EmbeddingFunctionRegistry.get_instance().parse_functions(active_metadata).keys()
)
vector_columns = []
for field in target_schema:
if field.name not in reader_column_names:
continue
if not _is_list_like(field.type) or not pa.types.is_floating(
field.type.value_type
):
continue
reader_field = reader_schema.field(field.name)
named_vector_col = (
field.name in embedding_function_columns
or field.name == VECTOR_COLUMN_NAME
or (field.name == "embedding" and pa.types.is_fixed_size_list(field.type))
)
typed_fixed_vector_col = (
pa.types.is_fixed_size_list(reader_field.type)
and pa.types.is_floating(reader_field.type.value_type)
and reader_field.type.list_size >= 10
)
if named_vector_col or typed_fixed_vector_col:
vector_columns.append(
{
"name": field.name,
"expected_dim": (
field.type.list_size
if pa.types.is_fixed_size_list(field.type)
else None
),
"expected_value_type": field.type.value_type,
}
)
return vector_columns
def _vector_output_schema(
reader_schema: pa.Schema,
vector_columns: List[dict],
) -> pa.Schema:
columns_by_name = {column["name"]: column for column in vector_columns}
fields = []
for field in reader_schema:
column = columns_by_name.get(field.name)
if column is None:
output_type = field.type
else:
output_type = _vector_output_type(field, column)
fields.append(pa.field(field.name, output_type, field.nullable, field.metadata))
return pa.schema(fields, metadata=reader_schema.metadata)
def _vector_output_type(field: pa.Field, vector_column: dict) -> pa.DataType:
if not _is_list_like(field.type):
return field.type
if vector_column["expected_value_type"] is not None and (
pa.types.is_null(field.type.value_type)
or pa.types.is_integer(field.type.value_type)
or pa.types.is_unsigned_integer(field.type.value_type)
):
return pa.list_(vector_column["expected_value_type"])
if (
vector_column["expected_dim"] is not None
and pa.types.is_fixed_size_list(field.type)
and field.type.list_size != vector_column["expected_dim"]
):
return pa.list_(field.type.value_type)
return field.type
def _handle_bad_vector_column(
data: pa.RecordBatch,
vector_column_name: str,
on_bad_vectors: str = "error",
fill_value: float = 0.0,
expected_dim: Optional[int] = None,
expected_value_type: Optional[pa.DataType] = None,
) -> pa.RecordBatch:
"""
Ensure that the vector column exists and has type fixed_size_list(float)
Parameters
----------
data: pa.Table
The table to sanitize.
vector_column_name: str
The name of the vector column.
on_bad_vectors: str, default "error"
What to do if any of the vectors are not the same size or contains NaNs.
One of "error", "drop", "fill", "null".
fill_value: float, default 0.0
The value to use when filling vectors. Only used if on_bad_vectors="fill".
"""
position = data.column_names.index(vector_column_name)
vec_arr = data[vector_column_name]
if not _is_list_like(vec_arr.type):
return data
if (
expected_dim is not None
and pa.types.is_fixed_size_list(vec_arr.type)
and vec_arr.type.list_size != expected_dim
):
vec_arr = pa.array(vec_arr.to_pylist(), type=pa.list_(vec_arr.type.value_type))
data = data.set_column(position, vector_column_name, vec_arr)
if expected_value_type is not None and (
pa.types.is_integer(vec_arr.type.value_type)
or pa.types.is_unsigned_integer(vec_arr.type.value_type)
):
vec_arr = pa.array(vec_arr.to_pylist(), type=pa.list_(expected_value_type))
data = data.set_column(position, vector_column_name, vec_arr)
if pa.types.is_floating(vec_arr.type.value_type):
has_nan = has_nan_values(vec_arr)
else:
has_nan = pa.array([False] * len(vec_arr))
if expected_dim is not None:
dim = expected_dim
elif pa.types.is_fixed_size_list(vec_arr.type):
dim = vec_arr.type.list_size
else:
dim = _infer_vector_dim(vec_arr)
if dim is None:
return data
has_wrong_dim = pc.not_equal(pc.list_value_length(vec_arr), dim)
has_bad_vectors = pc.any(has_nan).as_py() or pc.any(has_wrong_dim).as_py()
if has_bad_vectors:
is_bad = pc.or_(has_nan, has_wrong_dim)
if on_bad_vectors == "error":
if pc.any(has_wrong_dim).as_py():
raise ValueError(
f"Vector column '{vector_column_name}' has variable length "
"vectors. Set on_bad_vectors='drop' to remove them, "
"set on_bad_vectors='fill' and fill_value=<value> to replace them, "
"or set on_bad_vectors='null' to replace them with null."
)
else:
raise ValueError(
f"Vector column '{vector_column_name}' has NaNs. "
"Set on_bad_vectors='drop' to remove them, "
"set on_bad_vectors='fill' and fill_value=<value> to replace them, "
"or set on_bad_vectors='null' to replace them with null."
)
elif on_bad_vectors == "null":
vec_arr = pc.if_else(
is_bad,
pa.scalar(None),
vec_arr,
)
elif on_bad_vectors == "drop":
data = data.filter(pc.invert(is_bad))
vec_arr = data[vector_column_name]
elif on_bad_vectors == "fill":
if fill_value is None:
raise ValueError(
"`fill_value` must not be None if `on_bad_vectors` is 'fill'"
)
vec_arr = pc.if_else(
is_bad,
pa.scalar([fill_value] * dim, type=vec_arr.type),
vec_arr,
)
else:
raise ValueError(f"Invalid value for on_bad_vectors: {on_bad_vectors}")
return data.set_column(position, vector_column_name, vec_arr)
def has_nan_values(arr: Union[pa.ListArray, pa.ChunkedArray]) -> pa.BooleanArray:
if isinstance(arr, pa.ChunkedArray):
values = pa.chunked_array([chunk.flatten() for chunk in arr.chunks])
else:
values = arr.flatten()
if pa.types.is_float16(values.type):
# is_nan isn't yet implemented for f16, so we cast to f32
# https://github.com/apache/arrow/issues/45083
values_has_nan = pc.is_nan(values.cast(pa.float32()))
else:
values_has_nan = pc.is_nan(values)
values_indices = pc.list_parent_indices(arr)
has_nan_indices = pc.unique(pc.filter(values_indices, values_has_nan))
indices = pa.array(range(len(arr)), type=pa.uint32())
return pc.is_in(indices, has_nan_indices)
def _is_list_like(data_type: pa.DataType) -> bool:
return (
pa.types.is_list(data_type)
or pa.types.is_large_list(data_type)
or pa.types.is_fixed_size_list(data_type)
)
def _merge_metadata(*metadata_dicts: Optional[dict]) -> dict:
merged = {}
for metadata in metadata_dicts:
if metadata is None:
continue
for key, value in metadata.items():
if isinstance(key, str):
key = key.encode("utf-8")
if isinstance(value, str):
value = value.encode("utf-8")
merged[key] = value
return merged
def _name_suggests_vector_column(field_name: str) -> bool:
"""Check if a field name indicates a vector column."""
name_lower = field_name.lower()
return "vector" in name_lower or "embedding" in name_lower
def _infer_target_schema(
reader: pa.RecordBatchReader,
) -> Tuple[pa.Schema, pa.RecordBatchReader]:
schema = reader.schema
peeked = None
for i, field in enumerate(schema):
is_list_type = pa.types.is_list(field.type) or pa.types.is_large_list(
field.type
)
if _name_suggests_vector_column(field.name) and is_list_type:
if peeked is None:
peeked, reader = peek_reader(reader)
# Use the most common length of the list as the dimensions
dim = _modal_list_size(peeked.column(i))
# Determine target type based on value type
if pa.types.is_floating(field.type.value_type):
target_type = pa.list_(pa.float32(), dim)
elif pa.types.is_integer(field.type.value_type):
values = peeked.column(i)
if isinstance(values, pa.ChunkedArray):
values = values.combine_chunks()
flattened = values.flatten()
valid_count = pc.count(flattened, mode="only_valid").as_py()
if valid_count == 0:
target_type = pa.list_(pa.uint8(), dim)
else:
min_max = pc.min_max(flattened)
min_value = min_max["min"].as_py()
max_value = min_max["max"].as_py()
if (min_value is not None and min_value < 0) or (
max_value is not None and max_value > 255
):
target_type = pa.list_(pa.float32(), dim)
else:
target_type = pa.list_(pa.uint8(), dim)
else:
continue # Skip non-numeric types
new_field = pa.field(
field.name, # preserve original field name
target_type,
nullable=field.nullable,
)
schema = schema.set(i, new_field)
return schema, reader
def _modal_list_size(arr: Union[pa.ListArray, pa.ChunkedArray]) -> int:
# Use the most common length of the list as the dimensions
return pc.mode(pc.list_value_length(arr))[0].as_py()["mode"]
def _infer_vector_dim(arr: Union[pa.Array, pa.ChunkedArray]) -> Optional[int]:
if not _is_list_like(arr.type):
return None
lengths = pc.list_value_length(arr)
lengths = pc.filter(lengths, pc.greater(lengths, 0))
if len(lengths) == 0:
return None
return pc.mode(lengths)[0].as_py()["mode"]
def _validate_schema(schema: pa.Schema):
"""
Make sure the metadata is valid utf8
"""
if schema.metadata is not None:
_validate_metadata(schema.metadata)
def _validate_metadata(metadata: dict):
"""
Make sure the metadata values are valid utf8 (can be nested)
Raises ValueError if not valid utf8
"""
for k, v in metadata.items():
if isinstance(v, bytes):
try:
v.decode("utf8")
except UnicodeDecodeError:
raise ValueError(
f"Metadata key {k} is not valid utf8. "
"Consider base64 encode for generic binary metadata."
)
elif isinstance(v, dict):
_validate_metadata(v)
class AsyncTable:
"""
An AsyncTable is a collection of Records in a LanceDB Database.
An AsyncTable can be obtained from the
[AsyncConnection.create_table][lancedb.AsyncConnection.create_table] and
[AsyncConnection.open_table][lancedb.AsyncConnection.open_table] methods.
An AsyncTable object is expected to be long lived and reused for multiple
operations. AsyncTable objects will cache a certain amount of index data in memory.
This cache will be freed when the Table is garbage collected. To eagerly free the
cache you can call the [close][lancedb.AsyncTable.close] method. Once the
AsyncTable is closed, it cannot be used for any further operations.
An AsyncTable can also be used as a context manager, and will automatically close
when the context is exited. Closing a table is optional. If you do not close the
table, it will be closed when the AsyncTable object is garbage collected.
Examples
--------
Create using [AsyncConnection.create_table][lancedb.AsyncConnection.create_table]
(more examples in that method's documentation).
>>> import lancedb
>>> async def create_a_table():
... db = await lancedb.connect_async("./.lancedb")
... data = [{"vector": [1.1, 1.2], "b": 2}]
... table = await db.create_table("my_table", data=data)
... print(await table.query().limit(5).to_arrow())
>>> import asyncio
>>> asyncio.run(create_a_table())
pyarrow.Table
vector: fixed_size_list<item: float>[2]
child 0, item: float
b: int64
----
vector: [[[1.1,1.2]]]
b: [[2]]
Can append new data with [AsyncTable.add()][lancedb.table.AsyncTable.add].
>>> async def add_to_table():
... db = await lancedb.connect_async("./.lancedb")
... table = await db.open_table("my_table")
... await table.add([{"vector": [0.5, 1.3], "b": 4}])
>>> asyncio.run(add_to_table())
Can query the table with
[AsyncTable.vector_search][lancedb.table.AsyncTable.vector_search].
>>> async def search_table_for_vector():
... db = await lancedb.connect_async("./.lancedb")
... table = await db.open_table("my_table")
... results = (
... await table.vector_search([0.4, 0.4]).select(["b", "vector"]).to_pandas()
... )
... print(results)
>>> asyncio.run(search_table_for_vector())
b vector _distance
0 4 [0.5, 1.3] 0.82
1 2 [1.1, 1.2] 1.13
Search queries are much faster when an index is created. See
[AsyncTable.create_index][lancedb.table.AsyncTable.create_index].
"""
def __init__(self, table: LanceDBTable):
"""Create a new AsyncTable object.
You should not create AsyncTable objects directly.
Use [AsyncConnection.create_table][lancedb.AsyncConnection.create_table] and
[AsyncConnection.open_table][lancedb.AsyncConnection.open_table] to obtain
Table objects."""
self._inner = table
def __repr__(self):
return self._inner.__repr__()
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
def is_open(self) -> bool:
"""Return True if the table is open."""
return self._inner.is_open()
def close(self):
"""Close the table and free any resources associated with it.
It is safe to call this method multiple times.
Any attempt to use the table after it has been closed will raise an error."""
return self._inner.close()
@property
def name(self) -> str:
"""The name of the table."""
return self._inner.name()
async def schema(self) -> pa.Schema:
"""The [Arrow Schema](https://arrow.apache.org/docs/python/api/datatypes.html#)
of this Table
"""
return await self._inner.schema()
async def embedding_functions(self) -> Dict[str, EmbeddingFunctionConfig]:
"""
Get the embedding functions for the table
Returns
-------
funcs: Dict[str, EmbeddingFunctionConfig]
A mapping of the vector column to the embedding function
or empty dict if not configured.
"""
schema = await self.schema()
return EmbeddingFunctionRegistry.get_instance().parse_functions(schema.metadata)
async def count_rows(self, filter: Optional[str] = None) -> int:
"""
Count the number of rows in the table.
Parameters
----------
filter: str, optional
A SQL where clause to filter the rows to count.
"""
return await self._inner.count_rows(filter)
async def head(self, n=5) -> pa.Table:
"""
Return the first `n` rows of the table.
Parameters
----------
n: int, default 5
The number of rows to return.
"""
return await self.query().limit(n).to_arrow()
def query(self) -> AsyncQuery:
"""
Returns an [AsyncQuery][lancedb.query.AsyncQuery] that can be used
to search the table.
Use methods on the returned query to control query behavior. The query
can be executed with methods like [to_arrow][lancedb.query.AsyncQuery.to_arrow],
[to_pandas][lancedb.query.AsyncQuery.to_pandas] and more.
"""
return AsyncQuery(self._inner.query())
async def to_pandas(self) -> "pd.DataFrame":
"""Return the table as a pandas DataFrame.
Returns
-------
pd.DataFrame
"""
return (await self.to_arrow()).to_pandas()
async def to_arrow(self) -> pa.Table:
"""Return the table as a pyarrow Table.
Returns
-------
pa.Table
"""
return await self.query().to_arrow()
async def create_index(
self,
column: str,
*,
replace: Optional[bool] = None,
config: Optional[
Union[IvfFlat, IvfPq, IvfRq, HnswPq, HnswSq, BTree, Bitmap, LabelList, FTS]
] = None,
wait_timeout: Optional[timedelta] = None,
name: Optional[str] = None,
train: bool = True,
):
"""Create an index to speed up queries
Indices can be created on vector columns or scalar columns.
Indices on vector columns will speed up vector searches.
Indices on scalar columns will speed up filtering (in both
vector and non-vector searches)
Parameters
----------
column: str
The column to index.
replace: bool, default True
Whether to replace the existing index
If this is false, and another index already exists on the same columns
and the same name, then an error will be returned. This is true even if
that index is out of date.
The default is True
config: default None
For advanced configuration you can specify the type of index you would
like to create. You can also specify index-specific parameters when
creating an index object.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
name: str, optional
The name of the index. If not provided, a default name will be generated.
train: bool, default True
Whether to train the index with existing data. Vector indices always train
with existing data.
"""
if config is not None:
if not isinstance(
config,
(
IvfFlat,
IvfSq,
IvfPq,
IvfRq,
HnswPq,
HnswSq,
BTree,
Bitmap,
LabelList,
FTS,
),
):
raise TypeError(
"config must be an instance of IvfSq, IvfPq, IvfRq, HnswPq, HnswSq,"
" BTree, Bitmap, LabelList, or FTS, but got " + str(type(config))
)
try:
await self._inner.create_index(
column,
index=config,
replace=replace,
wait_timeout=wait_timeout,
name=name,
train=train,
)
except ValueError as e:
if "not support the requested language" in str(e):
supported_langs = ", ".join(lang_mapping.values())
help_msg = f"Supported languages: {supported_langs}"
add_note(e, help_msg)
raise e
async def drop_index(self, name: str) -> None:
"""
Drop an index from the table.
Parameters
----------
name: str
The name of the index to drop.
Notes
-----
This does not delete the index from disk, it just removes it from the table.
To delete the index, run [optimize][lancedb.table.AsyncTable.optimize]
after dropping the index.
Use [list_indices][lancedb.table.AsyncTable.list_indices] to find the names
of the indices.
"""
await self._inner.drop_index(name)
async def prewarm_index(self, name: str) -> None:
"""
Prewarm an index in the table.
This is a hint to the database that the index will be accessed in the
future and should be loaded into memory if possible. This can reduce
cold-start latency for subsequent queries.
This call initiates prewarming and returns once the request is accepted.
It is idempotent and safe to call from multiple clients concurrently.
It is generally wasteful to call this if the index does not fit into the
available cache. Not all index types support prewarming; unsupported
indices will silently ignore the request.
Parameters
----------
name: str
The name of the index to prewarm
"""
await self._inner.prewarm_index(name)
async def prewarm_data(self, columns: Optional[List[str]] = None) -> None:
"""
Prewarm data for the table.
This is a hint to the database that the given columns will be accessed
in the future and the database should prefetch the data if possible.
Currently only supported on remote tables.
This call initiates prewarming and returns once the request is accepted.
It is idempotent and safe to call from multiple clients concurrently.
This operation has a large upfront cost but can speed up future queries
that need to fetch the given columns. Large columns such as embeddings
or binary data may not be practical to prewarm. This feature is intended
for workloads that issue many queries against the same columns.
Parameters
----------
columns: list of str, optional
The columns to prewarm. If None, all columns are prewarmed.
"""
await self._inner.prewarm_data(columns)
async def wait_for_index(
self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300)
) -> None:
"""
Wait for indexing to complete for the given index names.
This will poll the table until all the indices are fully indexed,
or raise a timeout exception if the timeout is reached.
Parameters
----------
index_names: str
The name of the indices to poll
timeout: timedelta
Timeout to wait for asynchronous indexing. The default is 5 minutes.
"""
await self._inner.wait_for_index(index_names, timeout)
async def stats(self) -> TableStatistics:
"""
Retrieve table and fragment statistics.
"""
return await self._inner.stats()
async def uri(self) -> str:
"""
Get the table URI (storage location).
For remote tables, this fetches the location from the server via describe.
For local tables, this returns the dataset URI.
Returns
-------
str
The full storage location of the table (e.g., S3/GCS path).
"""
return await self._inner.uri()
async def initial_storage_options(self) -> Optional[Dict[str, str]]:
"""Get the initial storage options that were passed in when opening this table.
For dynamically refreshed options (e.g., credential vending), use
:meth:`latest_storage_options`.
Warning: This is an internal API and the return value is subject to change.
Returns
-------
Optional[Dict[str, str]]
The storage options, or None if no storage options were configured.
"""
return await self._inner.initial_storage_options()
async def latest_storage_options(self) -> Optional[Dict[str, str]]:
"""Get the latest storage options, refreshing from provider if configured.
This method is useful for credential vending scenarios where storage options
may be refreshed dynamically. If no dynamic provider is configured, this
returns the initial static options.
Warning: This is an internal API and the return value is subject to change.
Returns
-------
Optional[Dict[str, str]]
The storage options, or None if no storage options were configured.
"""
return await self._inner.latest_storage_options()
async def add(
self,
data: DATA,
*,
mode: Optional[Literal["append", "overwrite"]] = "append",
on_bad_vectors: Optional[OnBadVectorsType] = None,
fill_value: Optional[float] = None,
progress: Optional[Union[bool, Callable, Any]] = None,
) -> AddResult:
"""Add more data to the [Table](Table).
Parameters
----------
data: DATA
The data to insert into the table. Acceptable types are:
- list-of-dict
- pandas.DataFrame
- pyarrow.Table or pyarrow.RecordBatch
mode: str
The mode to use when writing the data. Valid values are
"append" and "overwrite".
on_bad_vectors: str, default "error"
What to do if any of the vectors are not the same size or contains NaNs.
One of "error", "drop", "fill", "null".
fill_value: float, default 0.
The value to use when filling vectors. Only used if on_bad_vectors="fill".
progress: callable or tqdm-like, optional
A callback or tqdm-compatible progress bar. See
:meth:`Table.add` for details.
"""
schema = await self.schema()
if on_bad_vectors is None:
on_bad_vectors = "error"
if fill_value is None:
fill_value = 0.0
# _santitize_data is an old code path, but we will use it until the
# new code path is ready.
if mode == "overwrite":
# For overwrite, apply the same preprocessing as create_table
# so vector columns are inferred as FixedSizeList.
data, _ = sanitize_create_table(
data, None, on_bad_vectors=on_bad_vectors, fill_value=fill_value
)
elif on_bad_vectors != "error" or (
schema.metadata is not None and b"embedding_functions" in schema.metadata
):
data = _sanitize_data(
data,
schema,
metadata=schema.metadata,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
allow_subschema=True,
)
_register_optional_converters()
data = to_scannable(data)
progress, owns = _normalize_progress(progress)
try:
return await self._inner.add(data, mode or "append", progress=progress)
except RuntimeError as e:
if "Cast error" in str(e):
raise ValueError(e)
elif "Vector column contains NaN" in str(e):
raise ValueError(e)
else:
raise
finally:
if owns:
progress.close()
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
"""
Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
that can be used to create a "merge insert" operation
This operation can add rows, update rows, and remove rows all in a single
transaction. It is a very generic tool that can be used to create
behaviors like "insert if not exists", "update or insert (i.e. upsert)",
or even replace a portion of existing data with new data (e.g. replace
all data where month="january")
The merge insert operation works by combining new data from a
**source table** with existing data in a **target table** by using a
join. There are three categories of records.
"Matched" records are records that exist in both the source table and
the target table. "Not matched" records exist only in the source table
(e.g. these are new data) "Not matched by source" records exist only
in the target table (this is old data)
The builder returned by this method can be used to customize what
should happen for each category of data.
Please note that the data may appear to be reordered as part of this
operation. This is because updated rows will be deleted from the
dataset and then reinserted at the end with the new values.
Parameters
----------
on: Union[str, Iterable[str]]
A column (or columns) to join on. This is how records from the
source table and target table are matched. Typically this is some
kind of key or id column.
Examples
--------
>>> import lancedb
>>> data = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data)
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> res = table.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> res
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1)
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
a b
0 1 b
1 2 x
2 3 y
3 4 z
""" # noqa: E501
on = [on] if isinstance(on, str) else list(iter(on))
return LanceMergeInsertBuilder(self, on)
@overload
async def search(
self,
query: Optional[str] = None,
vector_column_name: Optional[str] = None,
query_type: Literal["auto"] = ...,
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> Union[AsyncHybridQuery, AsyncFTSQuery, AsyncVectorQuery]: ...
@overload
async def search(
self,
query: Optional[str] = None,
vector_column_name: Optional[str] = None,
query_type: Literal["hybrid"] = ...,
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> AsyncHybridQuery: ...
@overload
async def search(
self,
query: Optional[Union[VEC, "PIL.Image.Image", Tuple]] = None,
vector_column_name: Optional[str] = None,
query_type: Literal["auto"] = ...,
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> AsyncVectorQuery: ...
@overload
async def search(
self,
query: Optional[str] = None,
vector_column_name: Optional[str] = None,
query_type: Literal["fts"] = ...,
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> AsyncFTSQuery: ...
@overload
async def search(
self,
query: Optional[
Union[VEC, str, "PIL.Image.Image", Tuple, FullTextQuery]
] = None,
vector_column_name: Optional[str] = None,
query_type: Literal["vector"] = ...,
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> AsyncVectorQuery: ...
async def search(
self,
query: Optional[
Union[VEC, str, "PIL.Image.Image", Tuple, FullTextQuery]
] = None,
vector_column_name: Optional[str] = None,
query_type: QueryType = "auto",
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> Union[AsyncHybridQuery, AsyncFTSQuery, AsyncVectorQuery]:
"""Create a search query to find the nearest neighbors
of the given query vector. We currently support [vector search][search]
and [full-text search][experimental-full-text-search].
All query options are defined in [AsyncQuery][lancedb.query.AsyncQuery].
Parameters
----------
query: list/np.ndarray/str/PIL.Image.Image, default None
The targetted vector to search for.
- *default None*.
Acceptable types are: list, np.ndarray, PIL.Image.Image
- If None then the select/where/limit clauses are applied to filter
the table
vector_column_name: str, optional
The name of the vector column to search.
The vector column needs to be a pyarrow fixed size list type
- If not specified then the vector column is inferred from
the table schema
- If the table has multiple vector columns then the *vector_column_name*
needs to be specified. Otherwise, an error is raised.
query_type: str
*default "auto"*.
Acceptable types are: "vector", "fts", "hybrid", or "auto"
- If "auto" then the query type is inferred from the query;
- If `query` is a list/np.ndarray then the query type is
"vector";
- If `query` is a PIL.Image.Image then either do vector search,
or raise an error if no corresponding embedding function is found.
- If `query` is a string, then the query type is "vector" if the
table has embedding functions else the query type is "fts"
Returns
-------
LanceQueryBuilder
A query builder object representing the query.
"""
def is_embedding(query):
return isinstance(query, (list, np.ndarray, pa.Array, pa.ChunkedArray))
async def get_embedding_func(
vector_column_name: Optional[str],
query_type: QueryType,
query: Optional[Union[VEC, str, "PIL.Image.Image", Tuple, FullTextQuery]],
) -> Tuple[str, EmbeddingFunctionConfig]:
if isinstance(query, FullTextQuery):
query_type = "fts"
schema = await self.schema()
vector_column_name = infer_vector_column_name(
schema=schema,
query_type=query_type,
query=query,
vector_column_name=vector_column_name,
)
funcs = EmbeddingFunctionRegistry.get_instance().parse_functions(
schema.metadata
)
func = funcs.get(vector_column_name)
if func is None:
error = ValueError(
f"Column '{vector_column_name}' has no registered "
"embedding function."
)
if len(funcs) > 0:
add_note(
error,
"Embedding functions are registered for columns: "
f"{list(funcs.keys())}",
)
else:
add_note(
error, "No embedding functions are registered for any columns."
)
raise error
return vector_column_name, func
async def make_embedding(embedding, query):
if embedding is not None:
loop = asyncio.get_running_loop()
# This function is likely to block, since it either calls an expensive
# function or makes an HTTP request to an embeddings REST API.
return (
await loop.run_in_executor(
None,
embedding.function.compute_query_embeddings_with_retry,
query,
)
)[0]
else:
return None
if query_type == "auto":
# Infer the query type.
if is_embedding(query):
vector_query = query
query_type = "vector"
elif isinstance(query, FullTextQuery):
query_type = "fts"
elif isinstance(query, str):
try:
(
indices,
(vector_column_name, embedding_conf),
) = await asyncio.gather(
self.list_indices(),
get_embedding_func(vector_column_name, "auto", query),
)
except ValueError as e:
if "Column" in str(
e
) and "has no registered embedding function" in str(e):
# If the column has no registered embedding function,
# then it's an FTS query.
query_type = "fts"
else:
raise e
else:
if embedding_conf is not None:
vector_query = await make_embedding(embedding_conf, query)
if any(
i.columns[0] == embedding_conf.source_column
and i.index_type == "FTS"
for i in indices
):
query_type = "hybrid"
else:
query_type = "vector"
else:
query_type = "fts"
else:
# it's an image or something else embeddable.
query_type = "vector"
elif query_type == "vector":
if is_embedding(query):
vector_query = query
else:
vector_column_name, embedding_conf = await get_embedding_func(
vector_column_name, query_type, query
)
vector_query = await make_embedding(embedding_conf, query)
elif query_type == "hybrid":
if is_embedding(query):
raise ValueError("Hybrid search requires a text query")
else:
vector_column_name, embedding_conf = await get_embedding_func(
vector_column_name, query_type, query
)
vector_query = await make_embedding(embedding_conf, query)
if query_type == "vector":
builder = self.query().nearest_to(vector_query)
if vector_column_name:
builder = builder.column(vector_column_name)
return builder
elif query_type == "fts":
return self.query().nearest_to_text(query, columns=fts_columns)
elif query_type == "hybrid":
builder = self.query().nearest_to(vector_query)
if vector_column_name:
builder = builder.column(vector_column_name)
return builder.nearest_to_text(query, columns=fts_columns)
else:
raise ValueError(f"Unknown query type: '{query_type}'")
def vector_search(
self,
query_vector: Union[VEC, Tuple],
) -> AsyncVectorQuery:
"""
Search the table with a given query vector.
This is a convenience method for preparing a vector query and
is the same thing as calling `nearestTo` on the builder returned
by `query`. Seer [nearest_to][lancedb.query.AsyncQuery.nearest_to] for more
details.
"""
return self.query().nearest_to(query_vector)
def _sync_query_to_async(
self, query: Query
) -> AsyncHybridQuery | AsyncFTSQuery | AsyncVectorQuery | AsyncQuery:
async_query = self.query()
if query.limit is not None:
async_query = async_query.limit(query.limit)
if query.offset is not None:
async_query = async_query.offset(query.offset)
if query.columns:
async_query = async_query.select(query.columns)
if query.filter is not None:
async_query = async_query.where(query.filter)
if query.fast_search:
async_query = async_query.fast_search()
if query.with_row_id:
async_query = async_query.with_row_id()
if query.vector:
async_query = async_query.nearest_to(query.vector).distance_range(
query.lower_bound, query.upper_bound
)
if query.distance_type is not None:
async_query = async_query.distance_type(query.distance_type)
if query.minimum_nprobes is not None and query.maximum_nprobes is not None:
# Set both to the minimum first to avoid min > max error.
async_query = async_query.nprobes(
query.minimum_nprobes
).maximum_nprobes(query.maximum_nprobes)
elif query.minimum_nprobes is not None:
async_query = async_query.minimum_nprobes(query.minimum_nprobes)
elif query.maximum_nprobes is not None:
async_query = async_query.maximum_nprobes(query.maximum_nprobes)
if query.refine_factor is not None:
async_query = async_query.refine_factor(query.refine_factor)
if query.vector_column:
async_query = async_query.column(query.vector_column)
if query.ef:
async_query = async_query.ef(query.ef)
if query.bypass_vector_index:
async_query = async_query.bypass_vector_index()
if query.postfilter:
async_query = async_query.postfilter()
if query.full_text_query:
async_query = async_query.nearest_to_text(
query.full_text_query.query, query.full_text_query.columns
)
return async_query
async def _execute_query(
self,
query: Query,
*,
batch_size: Optional[int] = None,
timeout: Optional[timedelta] = None,
) -> pa.RecordBatchReader:
# The sync table calls into this method, so we need to map the
# query to the async version of the query and run that here. This is only
# used for that code path right now.
async_query = self._sync_query_to_async(query)
return await async_query.to_batches(
max_batch_length=batch_size, timeout=timeout
)
async def _explain_plan(self, query: Query, verbose: Optional[bool]) -> str:
# This method is used by the sync table
async_query = self._sync_query_to_async(query)
return await async_query.explain_plan(verbose)
async def _analyze_plan(self, query: Query) -> str:
# This method is used by the sync table
async_query = self._sync_query_to_async(query)
return await async_query.analyze_plan()
async def _output_schema(self, query: Query) -> pa.Schema:
async_query = self._sync_query_to_async(query)
return await async_query.output_schema()
async def _do_merge(
self,
merge: LanceMergeInsertBuilder,
new_data: DATA,
on_bad_vectors: OnBadVectorsType,
fill_value: float,
) -> MergeResult:
schema = await self.schema()
if on_bad_vectors is None:
on_bad_vectors = "error"
if fill_value is None:
fill_value = 0.0
data = _sanitize_data(
new_data,
schema,
metadata=schema.metadata,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
allow_subschema=True,
)
if isinstance(data, pa.Table):
data = pa.RecordBatchReader.from_batches(data.schema, data.to_batches())
return await self._inner.execute_merge_insert(
data,
dict(
on=merge._on,
when_matched_update_all=merge._when_matched_update_all,
when_matched_update_all_condition=merge._when_matched_update_all_condition,
when_not_matched_insert_all=merge._when_not_matched_insert_all,
when_not_matched_by_source_delete=merge._when_not_matched_by_source_delete,
when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition,
timeout=merge._timeout,
use_index=merge._use_index,
),
)
async def delete(self, where: str) -> DeleteResult:
"""Delete rows from the table.
This can be used to delete a single row, many rows, all rows, or
sometimes no rows (if your predicate matches nothing).
Parameters
----------
where: str
The SQL where clause to use when deleting rows.
- For example, 'x = 2' or 'x IN (1, 2, 3)'.
The filter must not be empty, or it will error.
Examples
--------
>>> import lancedb
>>> data = [
... {"x": 1, "vector": [1.0, 2]},
... {"x": 2, "vector": [3.0, 4]},
... {"x": 3, "vector": [5.0, 6]}
... ]
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
1 2 [3.0, 4.0]
2 3 [5.0, 6.0]
>>> table.delete("x = 2")
DeleteResult(num_deleted_rows=1, version=2)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
1 3 [5.0, 6.0]
If you have a list of values to delete, you can combine them into a
stringified list and use the `IN` operator:
>>> to_remove = [1, 5]
>>> to_remove = ", ".join([str(v) for v in to_remove])
>>> to_remove
'1, 5'
>>> table.delete(f"x IN ({to_remove})")
DeleteResult(num_deleted_rows=1, version=3)
>>> table.to_pandas()
x vector
0 3 [5.0, 6.0]
"""
return await self._inner.delete(where)
async def update(
self,
updates: Optional[Dict[str, Any]] = None,
*,
where: Optional[str] = None,
updates_sql: Optional[Dict[str, str]] = None,
) -> UpdateResult:
"""
This can be used to update zero to all rows in the table.
If a filter is provided with `where` then only rows matching the
filter will be updated. Otherwise all rows will be updated.
Parameters
----------
updates: dict, optional
The updates to apply. The keys should be the name of the column to
update. The values should be the new values to assign. This is
required unless updates_sql is supplied.
where: str, optional
An SQL filter that controls which rows are updated. For example, 'x = 2'
or 'x IN (1, 2, 3)'. Only rows that satisfy this filter will be udpated.
updates_sql: dict, optional
The updates to apply, expressed as SQL expression strings. The keys should
be column names. The values should be SQL expressions. These can be SQL
literals (e.g. "7" or "'foo'") or they can be expressions based on the
previous value of the row (e.g. "x + 1" to increment the x column by 1)
Returns
-------
UpdateResult
An object containing:
- rows_updated: The number of rows that were updated
- version: The new version number of the table after the update
Examples
--------
>>> import asyncio
>>> import lancedb
>>> import pandas as pd
>>> async def demo_update():
... data = pd.DataFrame({"x": [1, 2], "vector": [[1, 2], [3, 4]]})
... db = await lancedb.connect_async("./.lancedb")
... table = await db.create_table("my_table", data)
... # x is [1, 2], vector is [[1, 2], [3, 4]]
... await table.update({"vector": [10, 10]}, where="x = 2")
... # x is [1, 2], vector is [[1, 2], [10, 10]]
... await table.update(updates_sql={"x": "x + 1"})
... # x is [2, 3], vector is [[1, 2], [10, 10]]
>>> asyncio.run(demo_update())
"""
if updates is not None and updates_sql is not None:
raise ValueError("Only one of updates or updates_sql can be provided")
if updates is None and updates_sql is None:
raise ValueError("Either updates or updates_sql must be provided")
if updates is not None:
updates_sql = {k: value_to_sql(v) for k, v in updates.items()}
return await self._inner.update(updates_sql, where)
async def add_columns(
self, transforms: dict[str, str] | pa.field | List[pa.field] | pa.Schema
) -> AddColumnsResult:
"""
Add new columns with defined values.
Parameters
----------
transforms: Dict[str, str]
A map of column name to a SQL expression to use to calculate the
value of the new column. These expressions will be evaluated for
each row in the table, and can reference existing columns.
Alternatively, you can pass a pyarrow field or schema to add
new columns with NULLs.
Returns
-------
AddColumnsResult
version: the new version number of the table after adding columns.
"""
if isinstance(transforms, pa.Field):
transforms = [transforms]
if isinstance(transforms, list) and all(
{isinstance(f, pa.Field) for f in transforms}
):
transforms = pa.schema(transforms)
if isinstance(transforms, pa.Schema):
return await self._inner.add_columns_with_schema(transforms)
else:
return await self._inner.add_columns(list(transforms.items()))
async def alter_columns(
self, *alterations: Iterable[dict[str, Any]]
) -> AlterColumnsResult:
"""
Alter column names and nullability.
alterations : Iterable[Dict[str, Any]]
A sequence of dictionaries, each with the following keys:
- "path": str
The column path to alter. For a top-level column, this is the name.
For a nested column, this is the dot-separated path, e.g. "a.b.c".
- "rename": str, optional
The new name of the column. If not specified, the column name is
not changed.
- "data_type": pyarrow.DataType, optional
The new data type of the column. Existing values will be casted
to this type. If not specified, the column data type is not changed.
- "nullable": bool, optional
Whether the column should be nullable. If not specified, the column
nullability is not changed. Only non-nullable columns can be changed
to nullable. Currently, you cannot change a nullable column to
non-nullable.
Returns
-------
AlterColumnsResult
version: the new version number of the table after the alteration.
"""
return await self._inner.alter_columns(alterations)
async def drop_columns(self, columns: Iterable[str]):
"""
Drop columns from the table.
Parameters
----------
columns : Iterable[str]
The names of the columns to drop.
"""
return await self._inner.drop_columns(columns)
async def version(self) -> int:
"""
Retrieve the version of the table
LanceDb supports versioning. Every operation that modifies the table increases
version. As long as a version hasn't been deleted you can `[Self::checkout]`
that version to view the data at that point. In addition, you can
`[Self::restore]` the version to replace the current table with a previous
version.
"""
return await self._inner.version()
async def list_versions(self):
"""
List all versions of the table
"""
versions = await self._inner.list_versions()
for v in versions:
ts_nanos = v["timestamp"]
v["timestamp"] = datetime.fromtimestamp(ts_nanos // 1e9) + timedelta(
microseconds=(ts_nanos % 1e9) // 1e3
)
return versions
async def checkout(self, version: int | str):
"""
Checks out a specific version of the Table
Any read operation on the table will now access the data at the checked out
version. As a consequence, calling this method will disable any read consistency
interval that was previously set.
This is a read-only operation that turns the table into a sort of "view"
or "detached head". Other table instances will not be affected. To make the
change permanent you can use the `[Self::restore]` method.
Any operation that modifies the table will fail while the table is in a checked
out state.
Parameters
----------
version: int | str,
The version to check out. A version number (`int`) or a tag
(`str`) can be provided.
To return the table to a normal state use `[Self::checkout_latest]`
"""
try:
await self._inner.checkout(version)
except RuntimeError as e:
if "not found" in str(e):
raise ValueError(
f"Version {version} no longer exists. Was it cleaned up?"
)
else:
raise
async def checkout_latest(self):
"""
Ensures the table is pointing at the latest version
This can be used to manually update a table when the read_consistency_interval
is None
It can also be used to undo a `[Self::checkout]` operation
"""
await self._inner.checkout_latest()
async def restore(self, version: Optional[int | str] = None):
"""
Restore the table to the currently checked out version
This operation will fail if checkout has not been called previously
This operation will overwrite the latest version of the table with a
previous version. Any changes made since the checked out version will
no longer be visible.
Once the operation concludes the table will no longer be in a checked
out state and the read_consistency_interval, if any, will apply.
"""
await self._inner.restore(version)
def take_offsets(self, offsets: list[int]) -> AsyncTakeQuery:
"""
Take a list of offsets from the table.
Offsets are 0-indexed and relative to the current version of the table. Offsets
are not stable. A row with an offset of N may have a different offset in a
different version of the table (e.g. if an earlier row is deleted).
Offsets are mostly useful for sampling as the set of all valid offsets is easily
known in advance to be [0, len(table)).
Parameters
----------
offsets: list[int]
The offsets to take.
Returns
-------
pa.RecordBatch
A record batch containing the rows at the given offsets.
"""
return AsyncTakeQuery(self._inner.take_offsets(offsets))
def take_row_ids(self, row_ids: list[int]) -> AsyncTakeQuery:
"""
Take a list of row ids from the table.
Row ids are not stable and are relative to the current version of the table.
They can change due to compaction and updates.
Unlike offsets, row ids are not 0-indexed and no assumptions should be made
about the possible range of row ids. In order to use this method you must
first obtain the row ids by scanning or searching the table.
Even so, row ids are more stable than offsets and can be useful in some
situations.
There is an ongoing effort to make row ids stable which is tracked at
https://github.com/lancedb/lancedb/issues/1120
Parameters
----------
row_ids: list[int]
The row ids to take.
Returns
-------
AsyncTakeQuery
A query object that can be executed to get the rows.
"""
return AsyncTakeQuery(self._inner.take_row_ids(row_ids))
@property
def tags(self) -> AsyncTags:
"""Tag management for the dataset.
Similar to Git, tags are a way to add metadata to a specific version of the
dataset.
.. warning::
Tagged versions are exempted from the
:py:meth:`optimize(cleanup_older_than)` process.
To remove a version that has been tagged, you must first
:py:meth:`~Tags.delete` the associated tag.
"""
return AsyncTags(self._inner)
async def optimize(
self,
*,
cleanup_older_than: Optional[timedelta] = None,
delete_unverified: bool = False,
retrain=False,
) -> OptimizeStats:
"""
Optimize the on-disk data and indices for better performance.
Modeled after ``VACUUM`` in PostgreSQL.
Optimization covers three operations:
* Compaction: Merges small files into larger ones
* Prune: Removes old versions of the dataset
* Index: Optimizes the indices, adding new data to existing indices
Parameters
----------
cleanup_older_than: timedelta, optional default 7 days
All files belonging to versions older than this will be removed. Set
to 0 days to remove all versions except the latest. The latest version
is never removed.
delete_unverified: bool, default False
Files leftover from a failed transaction may appear to be part of an
in-progress operation (e.g. appending new data) and these files will not
be deleted unless they are at least 7 days old. If delete_unverified is True
then these files will be deleted regardless of their age.
.. warning::
This should only be set to True if you can guarantee that no other
process is currently working on this dataset. Otherwise the dataset
could be put into a corrupted state.
retrain: bool, default False
This parameter is no longer used and is deprecated.
The frequency an application should call optimize is based on the frequency of
data modifications. If data is frequently added, deleted, or updated then
optimize should be run frequently. A good rule of thumb is to run optimize if
you have added or modified 100,000 or more records or run more than 20 data
modification operations.
"""
cleanup_since_ms: Optional[int] = None
if cleanup_older_than is not None:
cleanup_since_ms = round(cleanup_older_than.total_seconds() * 1000)
if retrain:
import warnings
warnings.warn(
"The 'retrain' parameter is deprecated and will be removed in a "
"future version.",
DeprecationWarning,
)
return await self._inner.optimize(
cleanup_since_ms=cleanup_since_ms,
delete_unverified=delete_unverified,
)
async def list_indices(self) -> Iterable[IndexConfig]:
"""
List all indices that have been created with Self::create_index
"""
return await self._inner.list_indices()
async def index_stats(self, index_name: str) -> Optional[IndexStatistics]:
"""
Retrieve statistics about an index
Parameters
----------
index_name: str
The name of the index to retrieve statistics for
Returns
-------
IndexStatistics or None
The statistics about the index. Returns None if the index does not exist.
"""
stats = await self._inner.index_stats(index_name)
if stats is None:
return None
else:
return IndexStatistics(**stats)
async def uses_v2_manifest_paths(self) -> bool:
"""
Check if the table is using the new v2 manifest paths.
Returns
-------
bool
True if the table is using the new v2 manifest paths, False otherwise.
"""
return await self._inner.uses_v2_manifest_paths()
async def migrate_manifest_paths_v2(self):
"""
Migrate the manifest paths to the new format.
This will update the manifest to use the new v2 format for paths.
This function is idempotent, and can be run multiple times without
changing the state of the object store.
!!! danger
This should not be run while other concurrent operations are happening.
And it should also run until completion before resuming other operations.
You can use
[AsyncTable.uses_v2_manifest_paths][lancedb.table.AsyncTable.uses_v2_manifest_paths]
to check if the table is already using the new path style.
"""
await self._inner.migrate_manifest_paths_v2()
async def replace_field_metadata(
self, field_name: str, new_metadata: dict[str, str]
):
"""
Replace the metadata of a field in the schema
Parameters
----------
field_name: str
The name of the field to replace the metadata for
new_metadata: dict
The new metadata to set
"""
await self._inner.replace_field_metadata(field_name, new_metadata)
@dataclass
class IndexStatistics:
"""
Statistics about an index.
Attributes
----------
num_indexed_rows: int
The number of rows that are covered by this index.
num_unindexed_rows: int
The number of rows that are not covered by this index.
index_type: str
The type of index that was created.
distance_type: Optional[str]
The distance type used by the index.
num_indices: Optional[int]
The number of parts the index is split into.
loss: Optional[float]
The KMeans loss for the index, for only vector indices.
"""
num_indexed_rows: int
num_unindexed_rows: int
index_type: Literal[
"IVF_FLAT",
"IVF_SQ",
"IVF_PQ",
"IVF_RQ",
"IVF_HNSW_SQ",
"IVF_HNSW_PQ",
"FTS",
"BTREE",
"BITMAP",
"LABEL_LIST",
]
distance_type: Optional[Literal["l2", "cosine", "dot"]] = None
num_indices: Optional[int] = None
loss: Optional[float] = None
# This exists for backwards compatibility with an older API, which returned
# a dictionary instead of a class.
def __getitem__(self, key):
return getattr(self, key)
@dataclass
class TableStatistics:
"""
Statistics about a table and fragments.
Attributes
----------
total_bytes: int
The total number of bytes in the table.
num_rows: int
The total number of rows in the table.
num_indices: int
The total number of indices in the table.
fragment_stats: FragmentStatistics
Statistics about fragments in the table.
"""
total_bytes: int
num_rows: int
num_indices: int
fragment_stats: FragmentStatistics
@dataclass
class FragmentStatistics:
"""
Statistics about fragments.
Attributes
----------
num_fragments: int
The total number of fragments in the table.
num_small_fragments: int
The total number of small fragments in the table.
Small fragments have low row counts and may need to be compacted.
lengths: FragmentSummaryStats
Statistics about the number of rows in the table fragments.
"""
num_fragments: int
num_small_fragments: int
lengths: FragmentSummaryStats
@dataclass
class FragmentSummaryStats:
"""
Statistics about fragments sizes
Attributes
----------
min: int
The number of rows in the fragment with the fewest rows.
max: int
The number of rows in the fragment with the most rows.
mean: int
The mean number of rows in the fragments.
p25: int
The 25th percentile of number of rows in the fragments.
p50: int
The 50th percentile of number of rows in the fragments.
p75: int
The 75th percentile of number of rows in the fragments.
p99: int
The 99th percentile of number of rows in the fragments.
"""
min: int
max: int
mean: int
p25: int
p50: int
p75: int
p99: int
class Tags:
"""
Table tag manager.
"""
def __init__(self, table):
self._table = table
def list(self) -> Dict[str, Tag]:
"""
List all table tags.
Returns
-------
dict[str, Tag]
A dictionary mapping tag names to version numbers.
"""
return LOOP.run(self._table.tags.list())
def get_version(self, tag: str) -> int:
"""
Get the version of a tag.
Parameters
----------
tag: str,
The name of the tag to get the version for.
"""
return LOOP.run(self._table.tags.get_version(tag))
def create(self, tag: str, version: int) -> None:
"""
Create a tag for a given table version.
Parameters
----------
tag: str,
The name of the tag to create. This name must be unique among all tag
names for the table.
version: int,
The table version to tag.
"""
LOOP.run(self._table.tags.create(tag, version))
def delete(self, tag: str) -> None:
"""
Delete tag from the table.
Parameters
----------
tag: str,
The name of the tag to delete.
"""
LOOP.run(self._table.tags.delete(tag))
def update(self, tag: str, version: int) -> None:
"""
Update tag to a new version.
Parameters
----------
tag: str,
The name of the tag to update.
version: int,
The new table version to tag.
"""
LOOP.run(self._table.tags.update(tag, version))
class AsyncTags:
"""
Async table tag manager.
"""
def __init__(self, table):
self._table = table
async def list(self) -> Dict[str, Tag]:
"""
List all table tags.
Returns
-------
dict[str, Tag]
A dictionary mapping tag names to version numbers.
"""
return await self._table.tags.list()
async def get_version(self, tag: str) -> int:
"""
Get the version of a tag.
Parameters
----------
tag: str,
The name of the tag to get the version for.
"""
return await self._table.tags.get_version(tag)
async def create(self, tag: str, version: int) -> None:
"""
Create a tag for a given table version.
Parameters
----------
tag: str,
The name of the tag to create. This name must be unique among all tag
names for the table.
version: int,
The table version to tag.
"""
await self._table.tags.create(tag, version)
async def delete(self, tag: str) -> None:
"""
Delete tag from the table.
Parameters
----------
tag: str,
The name of the tag to delete.
"""
await self._table.tags.delete(tag)
async def update(self, tag: str, version: int) -> None:
"""
Update tag to a new version.
Parameters
----------
tag: str,
The name of the tag to update.
version: int,
The new table version to tag.
"""
await self._table.tags.update(tag, version)