feat: add list_indices to the async api (#1074)

This commit is contained in:
Weston Pace
2024-03-12 14:41:21 -07:00
committed by GitHub
parent d744972f2f
commit 4dc7497547
14 changed files with 233 additions and 16 deletions

View File

@@ -121,7 +121,12 @@ describe("When creating an index", () => {
// check index directory
const indexDir = path.join(tmpDir.name, "test.lance", "_indices");
expect(fs.readdirSync(indexDir)).toHaveLength(1);
// TODO: check index type.
const indices = await tbl.listIndices();
expect(indices.length).toBe(1);
expect(indices[0]).toEqual({
indexType: "IvfPq",
columns: ["vec"],
});
// Search without specifying the column
const rst = await tbl.query().nearestTo(queryVec).limit(2).toArrow();

View File

@@ -3,6 +3,18 @@
/* auto-generated by NAPI-RS */
/** A description of an index currently configured on a column */
export interface IndexConfig {
/** The type of the index */
indexType: string
/**
* The columns in the index
*
* Currently this is always an array of size 1. In the future there may
* be more columns to represent composite indices.
*/
columns: Array<string>
}
/**
* A definition of a column alteration. The alteration changes the column at
* `path` to have the new name `name`, to be nullable if `nullable` is true,
@@ -122,4 +134,5 @@ export class Table {
checkout(version: number): Promise<void>
checkoutLatest(): Promise<void>
restore(): Promise<void>
listIndices(): Promise<Array<IndexConfig>>
}

View File

@@ -16,12 +16,14 @@ import { Schema, tableFromIPC } from "apache-arrow";
import {
AddColumnsSql,
ColumnAlteration,
IndexConfig,
Table as _NativeTable,
} from "./native";
import { Query } from "./query";
import { IndexOptions } from "./indices";
import { Data, fromDataToBuffer } from "./arrow";
export { IndexConfig } from "./native";
/**
* Options for adding data to a table.
*/
@@ -342,4 +344,11 @@ export class Table {
async restore(): Promise<void> {
await this.inner.restore();
}
/**
* List all indices that have been created with Self::create_index
*/
async listIndices(): Promise<IndexConfig[]> {
return await this.inner.listIndices();
}
}

View File

@@ -257,6 +257,40 @@ impl Table {
pub async fn restore(&self) -> napi::Result<()> {
self.inner_ref()?.restore().await.default_error()
}
#[napi]
pub async fn list_indices(&self) -> napi::Result<Vec<IndexConfig>> {
Ok(self
.inner_ref()?
.list_indices()
.await
.default_error()?
.into_iter()
.map(IndexConfig::from)
.collect::<Vec<_>>())
}
}
#[napi(object)]
/// A description of an index currently configured on a column
pub struct IndexConfig {
/// The type of the index
pub index_type: String,
/// The columns in the index
///
/// Currently this is always an array of size 1. In the future there may
/// be more columns to represent composite indices.
pub columns: Vec<String>,
}
impl From<lancedb::index::IndexConfig> for IndexConfig {
fn from(value: lancedb::index::IndexConfig) -> Self {
let index_type = format!("{:?}", value.index_type);
Self {
index_type,
columns: value.columns,
}
}
}
/// A definition of a column alteration. The alteration changes the column at

View File

@@ -1,4 +1,4 @@
from typing import Dict, Optional
from typing import Dict, List, Optional
import pyarrow as pa
@@ -39,6 +39,11 @@ class Table:
async def checkout(self, version): ...
async def checkout_latest(self): ...
async def restore(self): ...
async def list_indices(self) -> List[IndexConfig]: ...
class IndexConfig:
index_type: str
columns: List[str]
async def connect(
uri: str,

View File

@@ -3,6 +3,9 @@ from typing import Optional
from ._lancedb import (
Index as LanceDbIndex,
)
from ._lancedb import (
IndexConfig,
)
class BTree(object):
@@ -155,3 +158,6 @@ class IvfPq(object):
max_iterations=max_iterations,
sample_rate=sample_rate,
)
__all__ = ["BTree", "IvfPq", "IndexConfig"]

View File

@@ -60,7 +60,7 @@ if TYPE_CHECKING:
from ._lancedb import Table as LanceDBTable
from .db import LanceDBConnection
from .index import BTree, IvfPq
from .index import BTree, IndexConfig, IvfPq
pd = safe_import_pandas()
@@ -2417,3 +2417,9 @@ class AsyncTable:
out state and the read_consistency_interval, if any, will apply.
"""
await self._inner.restore()
async def list_indices(self) -> IndexConfig:
"""
List all indices that have been created with Self::create_index
"""
return await self._inner.list_indices()

View File

@@ -41,6 +41,10 @@ async def test_create_scalar_index(some_table: AsyncTable):
await some_table.create_index("id")
# Can recreate if replace=True
await some_table.create_index("id", replace=True)
indices = await some_table.list_indices()
assert len(indices) == 1
assert indices[0].index_type == "BTree"
assert indices[0].columns == ["id"]
# Can't recreate if replace=False
with pytest.raises(RuntimeError, match="already exists"):
await some_table.create_index("id", replace=False)
@@ -59,3 +63,7 @@ async def test_create_vector_index(some_table: AsyncTable):
await some_table.create_index("vector", replace=False)
# Can also specify index type
await some_table.create_index("vector", config=IvfPq(num_partitions=100))
indices = await some_table.list_indices()
assert len(indices) == 1
assert indices[0].index_type == "IvfPq"
assert indices[0].columns == ["vector"]

View File

@@ -85,3 +85,25 @@ impl Index {
})
}
}
#[pyclass(get_all)]
/// A description of an index currently configured on a column
pub struct IndexConfig {
/// The type of the index
pub index_type: String,
/// The columns in the index
///
/// Currently this is always a list of size 1. In the future there may
/// be more columns to represent composite indices.
pub columns: Vec<String>,
}
impl From<lancedb::index::IndexConfig> for IndexConfig {
fn from(value: lancedb::index::IndexConfig) -> Self {
let index_type = format!("{:?}", value.index_type);
Self {
index_type,
columns: value.columns,
}
}
}

View File

@@ -14,7 +14,7 @@
use connection::{connect, Connection};
use env_logger::Env;
use index::Index;
use index::{Index, IndexConfig};
use pyo3::{pymodule, types::PyModule, wrap_pyfunction, PyResult, Python};
use table::Table;
@@ -33,6 +33,7 @@ pub fn _lancedb(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<Connection>()?;
m.add_class::<Table>()?;
m.add_class::<Index>()?;
m.add_class::<IndexConfig>()?;
m.add_function(wrap_pyfunction!(connect, m)?)?;
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
Ok(())

View File

@@ -11,7 +11,10 @@ use pyo3::{
};
use pyo3_asyncio::tokio::future_into_py;
use crate::{error::PythonErrorExt, index::Index};
use crate::{
error::PythonErrorExt,
index::{Index, IndexConfig},
};
#[pyclass]
pub struct Table {
@@ -127,6 +130,19 @@ impl Table {
})
}
pub fn list_indices(self_: PyRef<'_, Self>) -> PyResult<&PyAny> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
Ok(inner
.list_indices()
.await
.infer_error()?
.into_iter()
.map(IndexConfig::from)
.collect::<Vec<_>>())
})
}
pub fn __repr__(&self) -> String {
match &self.inner {
None => format!("ClosedTable({})", self.name),

View File

@@ -61,3 +61,20 @@ impl IndexBuilder {
self.parent.clone().create_index(self).await
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum IndexType {
IvfPq,
BTree,
}
/// A description of an index currently configured on a column
pub struct IndexConfig {
/// The type of the index
pub index_type: IndexType,
/// The columns in the index
///
/// Currently this is always a Vec of size 1. In the future there may
/// be more columns to represent composite indices.
pub columns: Vec<String>,
}

View File

@@ -5,7 +5,7 @@ use lance::dataset::{scanner::DatasetRecordBatchStream, ColumnAlteration, NewCol
use crate::{
error::Result,
index::IndexBuilder,
index::{IndexBuilder, IndexConfig},
query::Query,
table::{
merge::MergeInsertBuilder, AddDataBuilder, NativeTable, OptimizeAction, OptimizeStats,
@@ -101,4 +101,7 @@ impl TableInternal for RemoteTable {
async fn drop_columns(&self, _columns: &[&str]) -> Result<()> {
todo!()
}
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
todo!()
}
}

View File

@@ -42,6 +42,7 @@ use snafu::whatever;
use crate::error::{Error, Result};
use crate::index::vector::{IvfPqIndexBuilder, VectorIndex, VectorIndexStatistics};
use crate::index::IndexConfig;
use crate::index::{
vector::{suggested_num_partitions, suggested_num_sub_vectors},
Index, IndexBuilder,
@@ -233,6 +234,7 @@ pub(crate) trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Syn
async fn delete(&self, predicate: &str) -> Result<()>;
async fn update(&self, update: UpdateBuilder) -> Result<()>;
async fn create_index(&self, index: IndexBuilder) -> Result<()>;
async fn list_indices(&self) -> Result<Vec<IndexConfig>>;
async fn merge_insert(
&self,
params: MergeInsertBuilder,
@@ -674,6 +676,11 @@ impl Table {
pub async fn restore(&self) -> Result<()> {
self.inner.restore().await
}
/// List all indices that have been created with [`Self::create_index`]
pub async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
self.inner.list_indices().await
}
}
impl From<NativeTable> for Table {
@@ -1398,6 +1405,25 @@ impl TableInternal for NativeTable {
self.dataset.get_mut().await?.drop_columns(columns).await?;
Ok(())
}
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
let dataset = self.dataset.get().await?;
let indices = dataset.load_indices().await?;
indices.iter().map(|idx| {
let mut is_vector = false;
let mut columns = Vec::with_capacity(idx.fields.len());
for field_id in &idx.fields {
let field = dataset.schema().field_by_id(*field_id).ok_or_else(|| Error::Runtime { message: format!("The index with name {} and uuid {} referenced a field with id {} which does not exist in the schema", idx.name, idx.uuid, field_id) })?;
if field.data_type().is_nested() {
// Temporary hack to determine if an index is scalar or vector
// Should be removed in https://github.com/lancedb/lance/issues/2039
is_vector = true;
}
columns.push(field.name.clone());
}
Ok(IndexConfig { index_type: if is_vector { crate::index::IndexType::IvfPq } else { crate::index::IndexType::BTree }, columns })
}).collect::<Result<Vec<_>>>()
}
}
#[cfg(test)]
@@ -1423,6 +1449,7 @@ mod tests {
use crate::connect;
use crate::connection::ConnectBuilder;
use crate::index::scalar::BTreeIndexBuilder;
use super::*;
@@ -2068,16 +2095,11 @@ mod tests {
.await
.unwrap();
assert_eq!(
table
.as_native()
.unwrap()
.load_indices()
.await
.unwrap()
.len(),
1
);
let index_configs = table.list_indices().await.unwrap();
assert_eq!(index_configs.len(), 1);
let index = index_configs.into_iter().next().unwrap();
assert_eq!(index.index_type, crate::index::IndexType::IvfPq);
assert_eq!(index.columns, vec!["embeddings".to_string()]);
assert_eq!(table.count_rows(None).await.unwrap(), 512);
assert_eq!(table.name(), "test");
@@ -2129,6 +2151,56 @@ mod tests {
RecordBatchIterator::new(vec![batch], schema)
}
#[tokio::test]
async fn test_create_scalar_index() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1]))],
)
.unwrap();
let conn = ConnectBuilder::new(uri).execute().await.unwrap();
let table = conn
.create_table(
"my_table",
Box::new(RecordBatchIterator::new(
vec![Ok(batch.clone())],
batch.schema(),
)),
)
.execute()
.await
.unwrap();
// Can create an index on a scalar column (will default to btree)
table
.create_index(&["i"], Index::Auto)
.execute()
.await
.unwrap();
let index_configs = table.list_indices().await.unwrap();
assert_eq!(index_configs.len(), 1);
let index = index_configs.into_iter().next().unwrap();
assert_eq!(index.index_type, crate::index::IndexType::BTree);
assert_eq!(index.columns, vec!["i".to_string()]);
// Can also specify btree
table
.create_index(&["i"], Index::BTree(BTreeIndexBuilder::default()))
.execute()
.await
.unwrap();
let index_configs = table.list_indices().await.unwrap();
assert_eq!(index_configs.len(), 1);
let index = index_configs.into_iter().next().unwrap();
assert_eq!(index.index_type, crate::index::IndexType::BTree);
assert_eq!(index.columns, vec!["i".to_string()]);
}
#[tokio::test]
async fn test_read_consistency_interval() {
let intervals = vec![