mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-26 08:20:39 +00:00
feat!: migrate FTS from tantivy to lance-index (#1483)
Lance now supports FTS, so add it into lancedb Python, TypeScript and Rust SDKs. For Python, we still use tantivy based FTS by default because the lance FTS index now misses some features of tantivy. For Python: - Support to create lance based FTS index - Support to specify columns for full text search (only available for lance based FTS index) For TypeScript: - Change the search method so that it can accept both string and vector - Support full text search For Rust - Support full text search The others: - Update the FTS doc BREAKING CHANGE: - for Python, this renames the attached score column of FTS from "score" to "_score", this could be a breaking change for users that rely the scores --------- Signed-off-by: BubbleCal <bubble-cal@outlook.com>
This commit is contained in:
@@ -56,6 +56,7 @@ tokenizers = { version = "0.19.1", optional = true }
|
||||
[dev-dependencies]
|
||||
tempfile = "3.5.0"
|
||||
rand = { version = "0.8.3", features = ["small_rng"] }
|
||||
random_word = { version = "0.4.3", features = ["en"] }
|
||||
uuid = { version = "1.7.0", features = ["v4"] }
|
||||
walkdir = "2"
|
||||
aws-sdk-dynamodb = { version = "1.38.0" }
|
||||
|
||||
114
rust/lancedb/examples/full_text_search.rs
Normal file
114
rust/lancedb/examples/full_text_search.rs
Normal file
@@ -0,0 +1,114 @@
|
||||
// Copyright 2024 Lance Developers.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
|
||||
use futures::TryStreamExt;
|
||||
use lance_index::scalar::FullTextSearchQuery;
|
||||
use lancedb::connection::Connection;
|
||||
use lancedb::index::scalar::FtsIndexBuilder;
|
||||
use lancedb::index::Index;
|
||||
use lancedb::query::{ExecutableQuery, QueryBase};
|
||||
use lancedb::{connect, Result, Table};
|
||||
use rand::random;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
if std::path::Path::new("data").exists() {
|
||||
std::fs::remove_dir_all("data").unwrap();
|
||||
}
|
||||
let uri = "data/sample-lancedb";
|
||||
let db = connect(uri).execute().await?;
|
||||
let tbl = create_table(&db).await?;
|
||||
|
||||
create_index(&tbl).await?;
|
||||
search_index(&tbl).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_some_records() -> Result<Box<dyn RecordBatchReader + Send>> {
|
||||
const TOTAL: usize = 1000;
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int32, false),
|
||||
Field::new("doc", DataType::Utf8, true),
|
||||
]));
|
||||
|
||||
let words = random_word::all(random_word::Lang::En)
|
||||
.iter()
|
||||
.step_by(1024)
|
||||
.take(500)
|
||||
.map(|w| *w)
|
||||
.collect::<Vec<_>>();
|
||||
let n_terms = 3;
|
||||
let batches = RecordBatchIterator::new(
|
||||
vec![RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from_iter_values(0..TOTAL as i32)),
|
||||
Arc::new(StringArray::from_iter_values((0..TOTAL).map(|_| {
|
||||
(0..n_terms)
|
||||
.map(|_| words[random::<usize>() % words.len()])
|
||||
.collect::<Vec<_>>()
|
||||
.join(" ")
|
||||
}))),
|
||||
],
|
||||
)
|
||||
.unwrap()]
|
||||
.into_iter()
|
||||
.map(Ok),
|
||||
schema.clone(),
|
||||
);
|
||||
Ok(Box::new(batches))
|
||||
}
|
||||
|
||||
async fn create_table(db: &Connection) -> Result<Table> {
|
||||
let initial_data: Box<dyn RecordBatchReader + Send> = create_some_records()?;
|
||||
let tbl = db.create_table("my_table", initial_data).execute().await?;
|
||||
Ok(tbl)
|
||||
}
|
||||
|
||||
async fn create_index(table: &Table) -> Result<()> {
|
||||
table
|
||||
.create_index(&["doc"], Index::FTS(FtsIndexBuilder::default()))
|
||||
.execute()
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn search_index(table: &Table) -> Result<()> {
|
||||
let words = random_word::all(random_word::Lang::En)
|
||||
.iter()
|
||||
.step_by(1024)
|
||||
.take(500)
|
||||
.map(|w| *w)
|
||||
.collect::<Vec<_>>();
|
||||
let query = words[0].to_owned();
|
||||
println!("Searching for: {}", query);
|
||||
|
||||
let mut results = table
|
||||
.query()
|
||||
.full_text_search(FullTextSearchQuery::new(words[0].to_owned()))
|
||||
.select(lancedb::query::Select::Columns(vec!["doc".to_owned()]))
|
||||
.limit(10)
|
||||
.execute()
|
||||
.await?;
|
||||
while let Some(batch) = results.try_next().await? {
|
||||
println!("{:?}", batch);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1217,7 +1217,7 @@ mod tests {
|
||||
|
||||
let tbl = db
|
||||
.create_table("v2_test", make_data())
|
||||
.use_legacy_format(false)
|
||||
.data_storage_version(LanceFileVersion::Stable)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use scalar::FtsIndexBuilder;
|
||||
use serde::Deserialize;
|
||||
use serde_with::skip_serializing_none;
|
||||
|
||||
@@ -30,6 +31,7 @@ pub mod vector;
|
||||
pub enum Index {
|
||||
Auto,
|
||||
BTree(BTreeIndexBuilder),
|
||||
FTS(FtsIndexBuilder),
|
||||
IvfPq(IvfPqIndexBuilder),
|
||||
IvfHnswPq(IvfHnswPqIndexBuilder),
|
||||
IvfHnswSq(IvfHnswSqIndexBuilder),
|
||||
|
||||
@@ -28,3 +28,13 @@
|
||||
pub struct BTreeIndexBuilder {}
|
||||
|
||||
impl BTreeIndexBuilder {}
|
||||
|
||||
/// Builder for a full text search index
|
||||
///
|
||||
/// A full text search index is an index on a string column that allows for full text search
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct FtsIndexBuilder {}
|
||||
|
||||
impl FtsIndexBuilder {}
|
||||
|
||||
pub use lance_index::scalar::FullTextSearchQuery;
|
||||
|
||||
@@ -21,6 +21,7 @@ use datafusion_physical_plan::ExecutionPlan;
|
||||
use half::f16;
|
||||
use lance::dataset::scanner::DatasetRecordBatchStream;
|
||||
use lance_datafusion::exec::execute_plan;
|
||||
use lance_index::scalar::FullTextSearchQuery;
|
||||
|
||||
use crate::arrow::SendableRecordBatchStream;
|
||||
use crate::error::{Error, Result};
|
||||
@@ -351,6 +352,17 @@ pub trait QueryBase {
|
||||
/// on the filter column(s).
|
||||
fn only_if(self, filter: impl AsRef<str>) -> Self;
|
||||
|
||||
/// Perform a full text search on the table.
|
||||
///
|
||||
/// The results will be returned in order of BM25 scores.
|
||||
///
|
||||
/// This method is only valid on tables that have a full text search index.
|
||||
///
|
||||
/// ```ignore
|
||||
/// query.full_text_search(FullTextSearchQuery::new("hello world"))
|
||||
/// ```
|
||||
fn full_text_search(self, query: FullTextSearchQuery) -> Self;
|
||||
|
||||
/// Return only the specified columns.
|
||||
///
|
||||
/// By default a query will return all columns from the table. However, this can have
|
||||
@@ -401,6 +413,11 @@ impl<T: HasQuery> QueryBase for T {
|
||||
self
|
||||
}
|
||||
|
||||
fn full_text_search(mut self, query: FullTextSearchQuery) -> Self {
|
||||
self.mut_query().full_text_search = Some(query);
|
||||
self
|
||||
}
|
||||
|
||||
fn select(mut self, select: Select) -> Self {
|
||||
self.mut_query().select = select;
|
||||
self
|
||||
@@ -502,8 +519,13 @@ pub struct Query {
|
||||
|
||||
/// limit the number of rows to return.
|
||||
pub(crate) limit: Option<usize>,
|
||||
|
||||
/// Apply filter to the returned rows.
|
||||
pub(crate) filter: Option<String>,
|
||||
|
||||
/// Perform a full text search on the table.
|
||||
pub(crate) full_text_search: Option<FullTextSearchQuery>,
|
||||
|
||||
/// Select column projection.
|
||||
pub(crate) select: Select,
|
||||
|
||||
@@ -520,6 +542,7 @@ impl Query {
|
||||
parent,
|
||||
limit: None,
|
||||
filter: None,
|
||||
full_text_search: None,
|
||||
select: Select::All,
|
||||
fast_search: false,
|
||||
}
|
||||
|
||||
@@ -1054,6 +1054,10 @@ impl NativeTable {
|
||||
)
|
||||
}
|
||||
|
||||
fn supported_fts_data_type(dtype: &DataType) -> bool {
|
||||
matches!(dtype, DataType::Utf8 | DataType::LargeUtf8)
|
||||
}
|
||||
|
||||
fn supported_vector_data_type(dtype: &DataType) -> bool {
|
||||
match dtype {
|
||||
DataType::FixedSizeList(inner, _) => DataType::is_floating(inner.data_type()),
|
||||
@@ -1524,6 +1528,33 @@ impl NativeTable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_fts_index(&self, field: &Field, opts: IndexBuilder) -> Result<()> {
|
||||
if !Self::supported_fts_data_type(field.data_type()) {
|
||||
return Err(Error::Schema {
|
||||
message: format!(
|
||||
"A FTS index cannot be created on the field `{}` which has data type {}",
|
||||
field.name(),
|
||||
field.data_type()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
let mut dataset = self.dataset.get_mut().await?;
|
||||
let lance_idx_params = lance_index::scalar::ScalarIndexParams {
|
||||
force_index_type: Some(lance_index::scalar::ScalarIndexType::Inverted),
|
||||
};
|
||||
dataset
|
||||
.create_index(
|
||||
&[field.name()],
|
||||
IndexType::Scalar,
|
||||
None,
|
||||
&lance_idx_params,
|
||||
opts.replace,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn generic_query(
|
||||
&self,
|
||||
query: &VectorQuery,
|
||||
@@ -1659,6 +1690,7 @@ impl TableInternal for NativeTable {
|
||||
match opts.index {
|
||||
Index::Auto => self.create_auto_index(field, opts).await,
|
||||
Index::BTree(_) => self.create_btree_index(field, opts).await,
|
||||
Index::FTS(_) => self.create_fts_index(field, opts).await,
|
||||
Index::IvfPq(ivf_pq) => self.create_ivf_pq_index(ivf_pq, field, opts.replace).await,
|
||||
Index::IvfHnswPq(ivf_hnsw_pq) => {
|
||||
self.create_ivf_hnsw_pq_index(ivf_hnsw_pq, field, opts.replace)
|
||||
@@ -1789,6 +1821,10 @@ impl TableInternal for NativeTable {
|
||||
scanner.filter(filter)?;
|
||||
}
|
||||
|
||||
if let Some(fts) = &query.base.full_text_search {
|
||||
scanner.full_text_search(fts.clone())?;
|
||||
}
|
||||
|
||||
if let Some(refine_factor) = query.refine_factor {
|
||||
scanner.refine(refine_factor);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user