feat: add analyze_plan api (#2280)

add analyze plan api to allow executing the queries and see runtime
metrics.
Which help identify the query IO overhead and help identify query
slowness
This commit is contained in:
LuQQiu
2025-03-28 14:28:52 -07:00
committed by GitHub
parent a547c523c2
commit a1d1833a40
18 changed files with 538 additions and 45 deletions

View File

@@ -96,6 +96,7 @@ class Query:
def nearest_to_text(self, query: dict) -> FTSQuery: ...
async def execute(self, max_batch_length: Optional[int]) -> RecordBatchStream: ...
async def explain_plan(self, verbose: Optional[bool]) -> str: ...
async def analyze_plan(self) -> str: ...
def to_query_request(self) -> PyQueryRequest: ...
class FTSQuery:

View File

@@ -659,6 +659,44 @@ class LanceQueryBuilder(ABC):
""" # noqa: E501
return self._table._explain_plan(self.to_query_object(), verbose=verbose)
def analyze_plan(self) -> str:
"""
Run the query and return its execution plan with runtime metrics.
This returns detailed metrics for each step, such as elapsed time,
rows processed, bytes read, and I/O stats. It is useful for debugging
and performance tuning.
Examples
--------
>>> import lancedb
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", [{"vector": [99.0, 99]}])
>>> query = [100, 100]
>>> plan = table.search(query).analyze_plan()
>>> print(plan) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
AnalyzeExec verbose=true, metrics=[]
ProjectionExec: expr=[...], metrics=[...]
GlobalLimitExec: skip=0, fetch=10, metrics=[...]
FilterExec: _distance@2 IS NOT NULL,
metrics=[output_rows=..., elapsed_compute=...]
SortExec: TopK(fetch=10), expr=[...],
preserve_partitioning=[...],
metrics=[output_rows=..., elapsed_compute=..., row_replacements=...]
KNNVectorDistance: metric=l2,
metrics=[output_rows=..., elapsed_compute=..., output_batches=...]
LanceScan: uri=..., projection=[vector], row_id=true,
row_addr=false, ordered=false,
metrics=[output_rows=..., elapsed_compute=...,
bytes_read=..., iops=..., requests=...]
Returns
-------
plan : str
The physical query execution plan with runtime metrics.
"""
return self._table._analyze_plan(self.to_query_object())
def vector(self, vector: Union[np.ndarray, list]) -> Self:
"""Set the vector to search for.
@@ -1941,6 +1979,15 @@ class AsyncQueryBase(object):
""" # noqa: E501
return await self._inner.explain_plan(verbose)
async def analyze_plan(self):
"""Execute the query and display with runtime metrics.
Returns
-------
plan : str
"""
return await self._inner.analyze_plan()
class AsyncQuery(AsyncQueryBase):
def __init__(self, inner: LanceQuery):
@@ -2510,7 +2557,7 @@ class AsyncHybridQuery(AsyncQueryBase, AsyncVectorQueryBase):
Returns
-------
plan
plan : str
""" # noqa: E501
results = ["Vector Search Plan:"]
@@ -2519,3 +2566,23 @@ class AsyncHybridQuery(AsyncQueryBase, AsyncVectorQueryBase):
results.append(await self._inner.to_fts_query().explain_plan(verbose))
return "\n".join(results)
async def analyze_plan(self):
"""
Execute the query and return the physical execution plan with runtime metrics.
This runs both the vector and FTS (full-text search) queries and returns
detailed metrics for each step of execution—such as rows processed,
elapsed time, I/O stats, and more. Its useful for debugging and
performance analysis.
Returns
-------
plan : str
"""
results = ["Vector Search Query:"]
results.append(await self._inner.to_vector_query().analyze_plan())
results.append("FTS Search Query:")
results.append(await self._inner.to_fts_query().analyze_plan())
return "\n".join(results)

View File

@@ -371,6 +371,9 @@ class RemoteTable(Table):
def _explain_plan(self, query: Query, verbose: Optional[bool] = False) -> str:
return LOOP.run(self._table._explain_plan(query, verbose))
def _analyze_plan(self, query: Query) -> str:
return LOOP.run(self._table._analyze_plan(query))
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
"""Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
that can be used to create a "merge insert" operation.

View File

@@ -1010,6 +1010,9 @@ class Table(ABC):
@abstractmethod
def _explain_plan(self, query: Query, verbose: Optional[bool] = False) -> str: ...
@abstractmethod
def _analyze_plan(self, query: Query) -> str: ...
@abstractmethod
def _do_merge(
self,
@@ -2318,6 +2321,9 @@ class LanceTable(Table):
def _explain_plan(self, query: Query, verbose: Optional[bool] = False) -> str:
return LOOP.run(self._table._explain_plan(query, verbose))
def _analyze_plan(self, query: Query) -> str:
return LOOP.run(self._table._analyze_plan(query))
def _do_merge(
self,
merge: LanceMergeInsertBuilder,
@@ -3388,6 +3394,11 @@ class AsyncTable:
async_query = self._sync_query_to_async(query)
return await async_query.explain_plan(verbose)
async def _analyze_plan(self, query: Query) -> str:
# This method is used by the sync table
async_query = self._sync_query_to_async(query)
return await async_query.analyze_plan()
async def _do_merge(
self,
merge: LanceMergeInsertBuilder,

View File

@@ -114,6 +114,16 @@ async def test_explain_plan(table: AsyncTable):
assert "LanceScan" in plan
@pytest.mark.asyncio
async def test_analyze_plan(table: AsyncTable):
res = await (
table.query().nearest_to_text("dog").nearest_to([0.1, 0.1]).analyze_plan()
)
assert "AnalyzeExec" in res
assert "metrics=" in res
def test_normalize_scores():
cases = [
(pa.array([0.1, 0.4]), pa.array([0.0, 1.0])),

View File

@@ -702,6 +702,20 @@ async def test_fast_search_async(tmp_path):
assert "LanceScan" not in plan
def test_analyze_plan(table):
q = LanceVectorQueryBuilder(table, [0, 0], "vector")
res = q.analyze_plan()
assert "AnalyzeExec" in res
assert "metrics=" in res
@pytest.mark.asyncio
async def test_analyze_plan_async(table_async: AsyncTable):
res = await table_async.query().nearest_to(pa.array([1, 2])).analyze_plan()
assert "AnalyzeExec" in res
assert "metrics=" in res
def test_explain_plan(table):
q = LanceVectorQueryBuilder(table, [0, 0], "vector")
plan = q.explain_plan(verbose=True)