feat: add the explain_plan function (#1328)

It's useful to see the underlying query plan for debugging purposes.
This exposes LanceScanner's `explain_plan` function. Addresses #1288

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
This commit is contained in:
Nuvic
2024-07-02 11:10:01 -07:00
committed by GitHub
parent 12b3c87964
commit 46c6ff889d
9 changed files with 226 additions and 15 deletions

View File

@@ -745,3 +745,27 @@ describe("table.search", () => {
expect(results[0].text).toBe(data[1].text);
});
});
describe("when calling explainPlan", () => {
let tmpDir: tmp.DirResult;
let table: Table;
let queryVec: number[];
beforeEach(async () => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
const con = await connect(tmpDir.name);
table = await con.createTable("vectors", [{ id: 1, vector: [0.1, 0.2] }]);
});
afterEach(() => {
tmpDir.removeCallback();
});
it("retrieves query plan", async () => {
queryVec = Array(2)
.fill(1)
.map(() => Math.random());
const plan = await table.query().nearestTo(queryVec).explainPlan(true);
expect(plan).toMatch("KNN");
});
});

View File

@@ -226,6 +226,24 @@ export class QueryBase<
const tbl = await this.toArrow(options);
return tbl.toArray();
}
/**
* Generates an explanation of the query execution plan.
*
* @example
* import * as lancedb from "@lancedb/lancedb"
* const db = await lancedb.connect("./.lancedb");
* const table = await db.createTable("my_table", [
* { vector: [1.1, 0.9], id: "1" },
* ]);
* const plan = await table.query().nearestTo([0.5, 0.2]).explainPlan();
*
* @param verbose - If true, provides a more detailed explanation. Defaults to false.
* @returns A Promise that resolves to a string containing the query execution plan explanation.
*/
async explainPlan(verbose = false): Promise<string> {
return await this.inner.explainPlan(verbose);
}
}
/**

View File

@@ -80,6 +80,13 @@ impl Query {
})?;
Ok(RecordBatchIterator::new(inner_stream))
}
#[napi]
pub async fn explain_plan(&self, verbose: bool) -> napi::Result<String> {
self.inner.explain_plan(verbose).await.map_err(|e| {
napi::Error::from_reason(format!("Failed to retrieve the query plan: {}", e))
})
}
}
#[napi]
@@ -154,4 +161,11 @@ impl VectorQuery {
})?;
Ok(RecordBatchIterator::new(inner_stream))
}
#[napi]
pub async fn explain_plan(&self, verbose: bool) -> napi::Result<String> {
self.inner.explain_plan(verbose).await.map_err(|e| {
napi::Error::from_reason(format!("Failed to retrieve the query plan: {}", e))
})
}
}

View File

@@ -417,6 +417,38 @@ class LanceQueryBuilder(ABC):
self._with_row_id = with_row_id
return self
def explain_plan(self, verbose: Optional[bool] = False) -> str:
"""Return the execution plan for this query.
Examples
--------
>>> import lancedb
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", [{"vector": [99, 99]}])
>>> query = [100, 100]
>>> plan = table.search(query).explain_plan(True)
>>> print(plan) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
Projection: fields=[vector, _distance]
KNNFlat: k=10 metric=l2
LanceScan: uri=..., projection=[vector], row_id=true, row_addr=false, ordered=false
Parameters
----------
verbose : bool, default False
Use a verbose output format.
Returns
-------
plan : str
""" # noqa: E501
ds = self._table.to_lance()
return ds.scanner(
nearest={
"column": self._vector_column,
"q": self._query,
},
).explain_plan(verbose)
class LanceVectorQueryBuilder(LanceQueryBuilder):
"""
@@ -1166,6 +1198,35 @@ class AsyncQueryBase(object):
"""
return (await self.to_arrow()).to_pandas()
async def explain_plan(self, verbose: Optional[bool] = False):
"""Return the execution plan for this query.
Examples
--------
>>> import asyncio
>>> from lancedb import connect_async
>>> async def doctest_example():
... conn = await connect_async("./.lancedb")
... table = await conn.create_table("my_table", [{"vector": [99, 99]}])
... query = [100, 100]
... plan = await table.query().nearest_to([1, 2]).explain_plan(True)
... print(plan)
>>> asyncio.run(doctest_example()) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
Projection: fields=[vector, _distance]
KNNFlat: k=10 metric=l2
LanceScan: uri=..., projection=[vector], row_id=true, row_addr=false, ordered=false
Parameters
----------
verbose : bool, default False
Use a verbose output format.
Returns
-------
plan : str
""" # noqa: E501
return await self._inner.explain_plan(verbose)
class AsyncQuery(AsyncQueryBase):
def __init__(self, inner: LanceQuery):

View File

@@ -333,3 +333,15 @@ async def test_query_to_pandas_async(table_async: AsyncTable):
df = await table_async.query().where("id < 0").to_pandas()
assert df.shape == (0, 4)
def test_explain_plan(table):
q = LanceVectorQueryBuilder(table, [0, 0], "vector")
plan = q.explain_plan(verbose=True)
assert "KNN" in plan
@pytest.mark.asyncio
async def test_explain_plan_async(table_async: AsyncTable):
plan = await table_async.query().nearest_to(pa.array([1, 2])).explain_plan(True)
assert "KNN" in plan

View File

@@ -19,6 +19,7 @@ use lancedb::query::QueryExecutionOptions;
use lancedb::query::{
ExecutableQuery, Query as LanceDbQuery, QueryBase, Select, VectorQuery as LanceDbVectorQuery,
};
use pyo3::exceptions::PyRuntimeError;
use pyo3::pyclass;
use pyo3::pymethods;
use pyo3::PyAny;
@@ -73,6 +74,16 @@ impl Query {
Ok(RecordBatchStream::new(inner_stream))
})
}
fn explain_plan(self_: PyRef<'_, Self>, verbose: bool) -> PyResult<&PyAny> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
inner
.explain_plan(verbose)
.await
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
})
}
}
#[pyclass]
@@ -131,4 +142,14 @@ impl VectorQuery {
Ok(RecordBatchStream::new(inner_stream))
})
}
fn explain_plan(self_: PyRef<'_, Self>, verbose: bool) -> PyResult<&PyAny> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
inner
.explain_plan(verbose)
.await
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
})
}
}

View File

@@ -465,6 +465,8 @@ pub trait ExecutableQuery {
&self,
options: QueryExecutionOptions,
) -> impl Future<Output = Result<SendableRecordBatchStream>> + Send;
fn explain_plan(&self, verbose: bool) -> impl Future<Output = Result<String>> + Send;
}
/// A builder for LanceDB queries.
@@ -572,6 +574,12 @@ impl ExecutableQuery for Query {
self.parent.clone().plain_query(self, options).await?,
))
}
async fn explain_plan(&self, verbose: bool) -> Result<String> {
self.parent
.explain_plan(&self.clone().into_vector(), verbose)
.await
}
}
/// A builder for vector searches
@@ -752,6 +760,10 @@ impl ExecutableQuery for VectorQuery {
)?),
))
}
async fn explain_plan(&self, verbose: bool) -> Result<String> {
self.base.parent.explain_plan(self, verbose).await
}
}
impl HasQuery for VectorQuery {

View File

@@ -1,10 +1,12 @@
use std::sync::Arc;
use crate::table::dataset::DatasetReadGuard;
use arrow_array::RecordBatchReader;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion_physical_plan::ExecutionPlan;
use lance::dataset::{scanner::DatasetRecordBatchStream, ColumnAlteration, NewColumnTransform};
use lance::dataset::scanner::{DatasetRecordBatchStream, Scanner};
use lance::dataset::{ColumnAlteration, NewColumnTransform};
use crate::{
connection::NoData,
@@ -74,6 +76,14 @@ impl TableInternal for RemoteTable {
) -> Result<()> {
todo!()
}
async fn build_plan(
&self,
_ds_ref: &DatasetReadGuard,
_query: &VectorQuery,
_options: Option<QueryExecutionOptions>,
) -> Result<Scanner> {
todo!()
}
async fn create_plan(
&self,
_query: &VectorQuery,
@@ -81,6 +91,9 @@ impl TableInternal for RemoteTable {
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
async fn explain_plan(&self, _query: &VectorQuery, _verbose: bool) -> Result<String> {
todo!()
}
async fn plain_query(
&self,
_query: &Query,

View File

@@ -65,7 +65,7 @@ use crate::query::{
};
use crate::utils::{default_vector_column, PatchReadParam, PatchWriteParam};
use self::dataset::DatasetConsistencyWrapper;
use self::dataset::{DatasetConsistencyWrapper, DatasetReadGuard};
use self::merge::MergeInsertBuilder;
pub(crate) mod dataset;
@@ -369,6 +369,12 @@ pub(crate) trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Syn
async fn schema(&self) -> Result<SchemaRef>;
/// Count the number of rows in this table.
async fn count_rows(&self, filter: Option<String>) -> Result<usize>;
async fn build_plan(
&self,
ds_ref: &DatasetReadGuard,
query: &VectorQuery,
options: Option<QueryExecutionOptions>,
) -> Result<Scanner>;
async fn create_plan(
&self,
query: &VectorQuery,
@@ -379,6 +385,7 @@ pub(crate) trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Syn
query: &Query,
options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream>;
async fn explain_plan(&self, query: &VectorQuery, verbose: bool) -> Result<String>;
async fn add(
&self,
add: AddDataBuilder<NoData>,
@@ -1667,12 +1674,12 @@ impl TableInternal for NativeTable {
Ok(())
}
async fn create_plan(
async fn build_plan(
&self,
ds_ref: &DatasetReadGuard,
query: &VectorQuery,
options: QueryExecutionOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let ds_ref = self.dataset.get().await?;
options: Option<QueryExecutionOptions>,
) -> Result<Scanner> {
let mut scanner: Scanner = ds_ref.scan();
if let Some(query_vector) = query.query_vector.as_ref() {
@@ -1684,9 +1691,11 @@ impl TableInternal for NativeTable {
let arrow_schema = Schema::from(ds_ref.schema());
default_vector_column(&arrow_schema, Some(query_vector.len() as i32))?
};
let field = ds_ref.schema().field(&column).ok_or(Error::Schema {
message: format!("Column {} not found in dataset schema", column),
})?;
if let arrow_schema::DataType::FixedSizeList(f, dim) = field.data_type() {
if !f.data_type().is_floating() {
return Err(Error::InvalidInput {
@@ -1698,16 +1707,17 @@ impl TableInternal for NativeTable {
}
if dim != query_vector.len() as i32 {
return Err(Error::InvalidInput {
message: format!(
"The dimension of the query vector does not match with the dimension of the vector column '{}': \
query dim={}, expected vector dim={}",
column,
query_vector.len(),
dim,
),
});
message: format!(
"The dimension of the query vector does not match with the dimension of the vector column '{}': \
query dim={}, expected vector dim={}",
column,
query_vector.len(),
dim,
),
});
}
}
let query_vector = query_vector.as_primitive::<Float32Type>();
scanner.nearest(
&column,
@@ -1718,10 +1728,26 @@ impl TableInternal for NativeTable {
// If there is no vector query, it's ok to not have a limit
scanner.limit(query.base.limit.map(|limit| limit as i64), None)?;
}
scanner.nprobs(query.nprobes);
scanner.use_index(query.use_index);
scanner.prefilter(query.prefilter);
scanner.batch_size(options.max_batch_length as usize);
if let Some(opts) = options {
scanner.batch_size(opts.max_batch_length as usize);
}
Ok(scanner)
}
async fn create_plan(
&self,
query: &VectorQuery,
options: QueryExecutionOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let ds_ref = self.dataset.get().await?;
let mut scanner = self.build_plan(&ds_ref, query, Some(options)).await?;
match &query.base.select {
Select::Columns(select) => {
@@ -1756,6 +1782,16 @@ impl TableInternal for NativeTable {
.await
}
async fn explain_plan(&self, query: &VectorQuery, verbose: bool) -> Result<String> {
let ds_ref = self.dataset.get().await?;
let scanner = self.build_plan(&ds_ref, query, None).await?;
let plan = scanner.explain_plan(verbose).await?;
Ok(plan)
}
async fn merge_insert(
&self,
params: MergeInsertBuilder,