mirror of
https://github.com/lancedb/lancedb.git
synced 2026-01-09 13:22:58 +00:00
feat(python): support hybrid search in async sdk (#1915)
fixes: https://github.com/lancedb/lancedb/issues/1765 --------- Co-authored-by: Will Jones <willjones127@gmail.com>
This commit is contained in:
@@ -79,9 +79,21 @@ class Query:
|
||||
def limit(self, limit: int): ...
|
||||
def offset(self, offset: int): ...
|
||||
def nearest_to(self, query_vec: pa.Array) -> VectorQuery: ...
|
||||
def nearest_to_text(self, query: dict) -> Query: ...
|
||||
def nearest_to_text(self, query: dict) -> FTSQuery: ...
|
||||
async def execute(self, max_batch_legnth: Optional[int]) -> RecordBatchStream: ...
|
||||
|
||||
class FTSQuery:
|
||||
def where(self, filter: str): ...
|
||||
def select(self, columns: List[str]): ...
|
||||
def limit(self, limit: int): ...
|
||||
def offset(self, offset: int): ...
|
||||
def fast_search(self): ...
|
||||
def with_row_id(self): ...
|
||||
def postfilter(self): ...
|
||||
def nearest_to(self, query_vec: pa.Array) -> HybridQuery: ...
|
||||
async def execute(self, max_batch_length: Optional[int]) -> RecordBatchStream: ...
|
||||
async def explain_plan(self) -> str: ...
|
||||
|
||||
class VectorQuery:
|
||||
async def execute(self) -> RecordBatchStream: ...
|
||||
def where(self, filter: str): ...
|
||||
@@ -95,6 +107,24 @@ class VectorQuery:
|
||||
def refine_factor(self, refine_factor: int): ...
|
||||
def nprobes(self, nprobes: int): ...
|
||||
def bypass_vector_index(self): ...
|
||||
def nearest_to_text(self, query: dict) -> HybridQuery: ...
|
||||
|
||||
class HybridQuery:
|
||||
def where(self, filter: str): ...
|
||||
def select(self, columns: List[str]): ...
|
||||
def limit(self, limit: int): ...
|
||||
def offset(self, offset: int): ...
|
||||
def fast_search(self): ...
|
||||
def with_row_id(self): ...
|
||||
def postfilter(self): ...
|
||||
def distance_type(self, distance_type: str): ...
|
||||
def refine_factor(self, refine_factor: int): ...
|
||||
def nprobes(self, nprobes: int): ...
|
||||
def bypass_vector_index(self): ...
|
||||
def to_vector_query(self) -> VectorQuery: ...
|
||||
def to_fts_query(self) -> FTSQuery: ...
|
||||
def get_limit(self) -> int: ...
|
||||
def get_with_row_id(self) -> bool: ...
|
||||
|
||||
class CompactionStats:
|
||||
fragments_removed: int
|
||||
|
||||
@@ -26,6 +26,7 @@ from typing import (
|
||||
Union,
|
||||
)
|
||||
|
||||
import asyncio
|
||||
import deprecation
|
||||
import numpy as np
|
||||
import pyarrow as pa
|
||||
@@ -44,6 +45,8 @@ if TYPE_CHECKING:
|
||||
import polars as pl
|
||||
|
||||
from ._lancedb import Query as LanceQuery
|
||||
from ._lancedb import FTSQuery as LanceFTSQuery
|
||||
from ._lancedb import HybridQuery as LanceHybridQuery
|
||||
from ._lancedb import VectorQuery as LanceVectorQuery
|
||||
from .common import VEC
|
||||
from .pydantic import LanceModel
|
||||
@@ -1124,35 +1127,55 @@ class LanceHybridQueryBuilder(LanceQueryBuilder):
|
||||
fts_results = fts_future.result()
|
||||
vector_results = vector_future.result()
|
||||
|
||||
# convert to ranks first if needed
|
||||
if self._norm == "rank":
|
||||
vector_results = self._rank(vector_results, "_distance")
|
||||
fts_results = self._rank(fts_results, "_score")
|
||||
return self._combine_hybrid_results(
|
||||
fts_results=fts_results,
|
||||
vector_results=vector_results,
|
||||
norm=self._norm,
|
||||
fts_query=self._fts_query._query,
|
||||
reranker=self._reranker,
|
||||
limit=self._limit,
|
||||
with_row_ids=self._with_row_id,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _combine_hybrid_results(
|
||||
fts_results: pa.Table,
|
||||
vector_results: pa.Table,
|
||||
norm: str,
|
||||
fts_query: str,
|
||||
reranker,
|
||||
limit: int,
|
||||
with_row_ids: bool,
|
||||
) -> pa.Table:
|
||||
if norm == "rank":
|
||||
vector_results = LanceHybridQueryBuilder._rank(vector_results, "_distance")
|
||||
fts_results = LanceHybridQueryBuilder._rank(fts_results, "_score")
|
||||
|
||||
# normalize the scores to be between 0 and 1, 0 being most relevant
|
||||
vector_results = self._normalize_scores(vector_results, "_distance")
|
||||
vector_results = LanceHybridQueryBuilder._normalize_scores(
|
||||
vector_results, "_distance"
|
||||
)
|
||||
|
||||
# In fts higher scores represent relevance. Not inverting them here as
|
||||
# rerankers might need to preserve this score to support `return_score="all"`
|
||||
fts_results = self._normalize_scores(fts_results, "_score")
|
||||
fts_results = LanceHybridQueryBuilder._normalize_scores(fts_results, "_score")
|
||||
|
||||
results = self._reranker.rerank_hybrid(
|
||||
self._fts_query._query, vector_results, fts_results
|
||||
)
|
||||
results = reranker.rerank_hybrid(fts_query, vector_results, fts_results)
|
||||
|
||||
check_reranker_result(results)
|
||||
|
||||
# apply limit after reranking
|
||||
results = results.slice(length=self._limit)
|
||||
results = results.slice(length=limit)
|
||||
|
||||
if not self._with_row_id:
|
||||
if not with_row_ids:
|
||||
results = results.drop(["_rowid"])
|
||||
|
||||
return results
|
||||
|
||||
def to_batches(self):
|
||||
raise NotImplementedError("to_batches not yet supported on a hybrid query")
|
||||
|
||||
def _rank(self, results: pa.Table, column: str, ascending: bool = True):
|
||||
@staticmethod
|
||||
def _rank(results: pa.Table, column: str, ascending: bool = True):
|
||||
if len(results) == 0:
|
||||
return results
|
||||
# Get the _score column from results
|
||||
@@ -1169,7 +1192,8 @@ class LanceHybridQueryBuilder(LanceQueryBuilder):
|
||||
)
|
||||
return results
|
||||
|
||||
def _normalize_scores(self, results: pa.Table, column: str, invert=False):
|
||||
@staticmethod
|
||||
def _normalize_scores(results: pa.Table, column: str, invert=False):
|
||||
if len(results) == 0:
|
||||
return results
|
||||
# Get the _score column from results
|
||||
@@ -1635,7 +1659,7 @@ class AsyncQuery(AsyncQueryBase):
|
||||
|
||||
def nearest_to_text(
|
||||
self, query: str, columns: Union[str, List[str]] = []
|
||||
) -> AsyncQuery:
|
||||
) -> AsyncFTSQuery:
|
||||
"""
|
||||
Find the documents that are most relevant to the given text query.
|
||||
|
||||
@@ -1658,8 +1682,90 @@ class AsyncQuery(AsyncQueryBase):
|
||||
"""
|
||||
if isinstance(columns, str):
|
||||
columns = [columns]
|
||||
self._inner.nearest_to_text({"query": query, "columns": columns})
|
||||
return self
|
||||
return AsyncFTSQuery(
|
||||
self._inner.nearest_to_text({"query": query, "columns": columns})
|
||||
)
|
||||
|
||||
|
||||
class AsyncFTSQuery(AsyncQueryBase):
|
||||
"""A query for full text search for LanceDB."""
|
||||
|
||||
def __init__(self, inner: LanceFTSQuery):
|
||||
super().__init__(inner)
|
||||
self._inner = inner
|
||||
|
||||
def get_query(self):
|
||||
self._inner.get_query()
|
||||
|
||||
def nearest_to(
|
||||
self,
|
||||
query_vector: Union[VEC, Tuple, List[VEC]],
|
||||
) -> AsyncHybridQuery:
|
||||
"""
|
||||
In addition doing text search on the LanceDB Table, also
|
||||
find the nearest vectors to the given query vector.
|
||||
|
||||
This converts the query from a FTS Query to a Hybrid query. Results
|
||||
from the vector search will be combined with results from the FTS query.
|
||||
|
||||
This method will attempt to convert the input to the query vector
|
||||
expected by the embedding model. If the input cannot be converted
|
||||
then an error will be thrown.
|
||||
|
||||
By default, there is no embedding model, and the input should be
|
||||
something that can be converted to a pyarrow array of floats. This
|
||||
includes lists, numpy arrays, and tuples.
|
||||
|
||||
If there is only one vector column (a column whose data type is a
|
||||
fixed size list of floats) then the column does not need to be specified.
|
||||
If there is more than one vector column you must use
|
||||
[AsyncVectorQuery.column][lancedb.query.AsyncVectorQuery.column] to specify
|
||||
which column you would like to compare with.
|
||||
|
||||
If no index has been created on the vector column then a vector query
|
||||
will perform a distance comparison between the query vector and every
|
||||
vector in the database and then sort the results. This is sometimes
|
||||
called a "flat search"
|
||||
|
||||
For small databases, with tens of thousands of vectors or less, this can
|
||||
be reasonably fast. In larger databases you should create a vector index
|
||||
on the column. If there is a vector index then an "approximate" nearest
|
||||
neighbor search (frequently called an ANN search) will be performed. This
|
||||
search is much faster, but the results will be approximate.
|
||||
|
||||
The query can be further parameterized using the returned builder. There
|
||||
are various ANN search parameters that will let you fine tune your recall
|
||||
accuracy vs search latency.
|
||||
|
||||
Hybrid searches always have a [limit][]. If `limit` has not been called then
|
||||
a default `limit` of 10 will be used.
|
||||
|
||||
Typically, a single vector is passed in as the query. However, you can also
|
||||
pass in multiple vectors. This can be useful if you want to find the nearest
|
||||
vectors to multiple query vectors. This is not expected to be faster than
|
||||
making multiple queries concurrently; it is just a convenience method.
|
||||
If multiple vectors are passed in then an additional column `query_index`
|
||||
will be added to the results. This column will contain the index of the
|
||||
query vector that the result is nearest to.
|
||||
"""
|
||||
if query_vector is None:
|
||||
raise ValueError("query_vector can not be None")
|
||||
|
||||
if (
|
||||
isinstance(query_vector, list)
|
||||
and len(query_vector) > 0
|
||||
and not isinstance(query_vector[0], (float, int))
|
||||
):
|
||||
# multiple have been passed
|
||||
query_vectors = [AsyncQuery._query_vec_to_array(v) for v in query_vector]
|
||||
new_self = self._inner.nearest_to(query_vectors[0])
|
||||
for v in query_vectors[1:]:
|
||||
new_self.add_query_vector(v)
|
||||
return AsyncHybridQuery(new_self)
|
||||
else:
|
||||
return AsyncHybridQuery(
|
||||
self._inner.nearest_to(AsyncQuery._query_vec_to_array(query_vector))
|
||||
)
|
||||
|
||||
|
||||
class AsyncVectorQuery(AsyncQueryBase):
|
||||
@@ -1796,3 +1902,160 @@ class AsyncVectorQuery(AsyncQueryBase):
|
||||
"""
|
||||
self._inner.bypass_vector_index()
|
||||
return self
|
||||
|
||||
def nearest_to_text(
|
||||
self, query: str, columns: Union[str, List[str]] = []
|
||||
) -> AsyncHybridQuery:
|
||||
"""
|
||||
Find the documents that are most relevant to the given text query,
|
||||
in addition to vector search.
|
||||
|
||||
This converts the vector query into a hybrid query.
|
||||
|
||||
This search will perform a full text search on the table and return
|
||||
the most relevant documents, combined with the vector query results.
|
||||
The text relevance is determined by BM25.
|
||||
|
||||
The columns to search must be with native FTS index
|
||||
(Tantivy-based can't work with this method).
|
||||
|
||||
By default, all indexed columns are searched,
|
||||
now only one column can be searched at a time.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
query: str
|
||||
The text query to search for.
|
||||
columns: str or list of str, default None
|
||||
The columns to search in. If None, all indexed columns are searched.
|
||||
For now only one column can be searched at a time.
|
||||
"""
|
||||
if isinstance(columns, str):
|
||||
columns = [columns]
|
||||
return AsyncHybridQuery(
|
||||
self._inner.nearest_to_text({"query": query, "columns": columns})
|
||||
)
|
||||
|
||||
|
||||
class AsyncHybridQuery(AsyncQueryBase):
|
||||
"""
|
||||
A query builder that performs hybrid vector and full text search.
|
||||
Results are combined and reranked based on the specified reranker.
|
||||
By default, the results are reranked using the RRFReranker, which
|
||||
uses reciprocal rank fusion score for reranking.
|
||||
|
||||
To make the vector and fts results comparable, the scores are normalized.
|
||||
Instead of normalizing scores, the `normalize` parameter can be set to "rank"
|
||||
in the `rerank` method to convert the scores to ranks and then normalize them.
|
||||
"""
|
||||
|
||||
def __init__(self, inner: LanceHybridQuery):
|
||||
super().__init__(inner)
|
||||
self._inner = inner
|
||||
self._norm = "score"
|
||||
self._reranker = RRFReranker()
|
||||
|
||||
def rerank(
|
||||
self, reranker: Reranker = RRFReranker(), normalize: str = "score"
|
||||
) -> AsyncHybridQuery:
|
||||
"""
|
||||
Rerank the hybrid search results using the specified reranker. The reranker
|
||||
must be an instance of Reranker class.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
reranker: Reranker, default RRFReranker()
|
||||
The reranker to use. Must be an instance of Reranker class.
|
||||
normalize: str, default "score"
|
||||
The method to normalize the scores. Can be "rank" or "score". If "rank",
|
||||
the scores are converted to ranks and then normalized. If "score", the
|
||||
scores are normalized directly.
|
||||
Returns
|
||||
-------
|
||||
AsyncHybridQuery
|
||||
The AsyncHybridQuery object.
|
||||
"""
|
||||
if normalize not in ["rank", "score"]:
|
||||
raise ValueError("normalize must be 'rank' or 'score'.")
|
||||
if reranker and not isinstance(reranker, Reranker):
|
||||
raise ValueError("reranker must be an instance of Reranker class.")
|
||||
|
||||
self._norm = normalize
|
||||
self._reranker = reranker
|
||||
|
||||
return self
|
||||
|
||||
async def to_batches(self):
|
||||
raise NotImplementedError("to_batches not yet supported on a hybrid query")
|
||||
|
||||
async def to_arrow(self) -> pa.Table:
|
||||
fts_query = AsyncFTSQuery(self._inner.to_fts_query())
|
||||
vec_query = AsyncVectorQuery(self._inner.to_vector_query())
|
||||
|
||||
# save the row ID choice that was made on the query builder and force it
|
||||
# to actually fetch the row ids because we need this for reranking
|
||||
with_row_ids = self._inner.get_with_row_id()
|
||||
fts_query.with_row_id()
|
||||
vec_query.with_row_id()
|
||||
|
||||
fts_results, vector_results = await asyncio.gather(
|
||||
fts_query.to_arrow(),
|
||||
vec_query.to_arrow(),
|
||||
)
|
||||
|
||||
return LanceHybridQueryBuilder._combine_hybrid_results(
|
||||
fts_results=fts_results,
|
||||
vector_results=vector_results,
|
||||
norm=self._norm,
|
||||
fts_query=fts_query.get_query(),
|
||||
reranker=self._reranker,
|
||||
limit=self._inner.get_limit(),
|
||||
with_row_ids=with_row_ids,
|
||||
)
|
||||
|
||||
async def explain_plan(self, verbose: Optional[bool] = False):
|
||||
"""Return the execution plan for this query.
|
||||
|
||||
The output includes both the vector and FTS search plans.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> import asyncio
|
||||
>>> from lancedb import connect_async
|
||||
>>> from lancedb.index import FTS
|
||||
>>> async def doctest_example():
|
||||
... conn = await connect_async("./.lancedb")
|
||||
... table = await conn.create_table("my_table", [{"vector": [99, 99], "text": "hello world"}])
|
||||
... await table.create_index("text", config=FTS(with_position=False))
|
||||
... query = [100, 100]
|
||||
... plan = await table.query().nearest_to([1, 2]).nearest_to_text("hello").explain_plan(True)
|
||||
... print(plan)
|
||||
>>> asyncio.run(doctest_example()) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
|
||||
Vector Search Plan:
|
||||
ProjectionExec: expr=[vector@0 as vector, text@3 as text, _distance@2 as _distance]
|
||||
Take: columns="vector, _rowid, _distance, (text)"
|
||||
CoalesceBatchesExec: target_batch_size=1024
|
||||
GlobalLimitExec: skip=0, fetch=10
|
||||
FilterExec: _distance@2 IS NOT NULL
|
||||
SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], preserve_partitioning=[false]
|
||||
KNNVectorDistance: metric=l2
|
||||
LanceScan: uri=..., projection=[vector], row_id=true, row_addr=false, ordered=false
|
||||
FTS Search Plan:
|
||||
LanceScan: uri=..., projection=[vector, text], row_id=false, row_addr=false, ordered=true
|
||||
|
||||
Parameters
|
||||
----------
|
||||
verbose : bool, default False
|
||||
Use a verbose output format.
|
||||
|
||||
Returns
|
||||
-------
|
||||
plan
|
||||
""" # noqa: E501
|
||||
|
||||
results = ["Vector Search Plan:"]
|
||||
results.append(await self._inner.to_vector_query().explain_plan(verbose))
|
||||
results.append("FTS Search Plan:")
|
||||
results.append(await self._inner.to_fts_query().explain_plan(verbose))
|
||||
|
||||
return "\n".join(results)
|
||||
|
||||
111
python/python/tests/test_hybrid_query.py
Normal file
111
python/python/tests/test_hybrid_query.py
Normal file
@@ -0,0 +1,111 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
import lancedb
|
||||
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from lancedb.index import FTS
|
||||
from lancedb.table import AsyncTable
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def table(tmpdir_factory) -> AsyncTable:
|
||||
tmp_path = str(tmpdir_factory.mktemp("data"))
|
||||
db = await lancedb.connect_async(tmp_path)
|
||||
data = pa.table(
|
||||
{
|
||||
"text": pa.array(["a", "b", "cat", "dog"]),
|
||||
"vector": pa.array(
|
||||
[[0.1, 0.1], [2, 2], [-0.1, -0.1], [0.5, -0.5]],
|
||||
type=pa.list_(pa.float32(), list_size=2),
|
||||
),
|
||||
}
|
||||
)
|
||||
table = await db.create_table("test", data)
|
||||
await table.create_index("text", config=FTS(with_position=False))
|
||||
return table
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_hybrid_query(table: AsyncTable):
|
||||
result = await (
|
||||
table.query().nearest_to([0.0, 0.4]).nearest_to_text("dog").limit(2).to_arrow()
|
||||
)
|
||||
assert len(result) == 2
|
||||
# ensure we get results that would match well for text and vector
|
||||
assert result["text"].to_pylist() == ["a", "dog"]
|
||||
|
||||
# ensure there is no rowid by default
|
||||
assert "_rowid" not in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_hybrid_query_with_row_ids(table: AsyncTable):
|
||||
result = await (
|
||||
table.query()
|
||||
.nearest_to([0.0, 0.4])
|
||||
.nearest_to_text("dog")
|
||||
.limit(2)
|
||||
.with_row_id()
|
||||
.to_arrow()
|
||||
)
|
||||
assert len(result) == 2
|
||||
# ensure we get results that would match well for text and vector
|
||||
assert result["text"].to_pylist() == ["a", "dog"]
|
||||
assert result["_rowid"].to_pylist() == [0, 3]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_hybrid_query_filters(table: AsyncTable):
|
||||
# test that query params are passed down from the regular builder to
|
||||
# child vector/fts builders
|
||||
result = await (
|
||||
table.query()
|
||||
.where("text not in ('a', 'dog')")
|
||||
.nearest_to([0.3, 0.3])
|
||||
.nearest_to_text("*a*")
|
||||
.limit(2)
|
||||
.to_arrow()
|
||||
)
|
||||
assert len(result) == 2
|
||||
# ensure we get results that would match well for text and vector
|
||||
assert result["text"].to_pylist() == ["cat", "b"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_hybrid_query_default_limit(table: AsyncTable):
|
||||
# add 10 new rows
|
||||
new_rows = []
|
||||
for i in range(100):
|
||||
if i < 2:
|
||||
new_rows.append({"text": "close_vec", "vector": [0.1, 0.1]})
|
||||
else:
|
||||
new_rows.append({"text": "far_vec", "vector": [5 * i, 5 * i]})
|
||||
await table.add(new_rows)
|
||||
result = await (
|
||||
table.query().nearest_to_text("dog").nearest_to([0.1, 0.1]).to_arrow()
|
||||
)
|
||||
|
||||
# assert we got the default limit of 10
|
||||
assert len(result) == 10
|
||||
|
||||
# assert we got the closest vectors and the text searched for
|
||||
texts = result["text"].to_pylist()
|
||||
assert texts.count("close_vec") == 2
|
||||
assert texts.count("dog") == 1
|
||||
assert texts.count("a") == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_explain_plan(table: AsyncTable):
|
||||
plan = await (
|
||||
table.query().nearest_to_text("dog").nearest_to([0.1, 0.1]).explain_plan(True)
|
||||
)
|
||||
|
||||
assert "Vector Search Plan" in plan
|
||||
assert "KNNVectorDistance" in plan
|
||||
assert "FTS Search Plan" in plan
|
||||
assert "LanceScan" in plan
|
||||
Reference in New Issue
Block a user