Compare commits

..

1 Commits

Author SHA1 Message Date
BubbleCal
4b24e28539 Add skip_merge option for FTS indexes 2026-01-20 21:32:41 +08:00
16 changed files with 55 additions and 170 deletions

View File

@@ -558,6 +558,12 @@ export interface FtsOptions {
* whether to only index the prefix of the token for ngram tokenizer
*/
prefixOnly?: boolean;
/**
* Whether to skip the partition merge stage after indexing.
* Useful for distributed indexing where merges are handled separately.
*/
skipMerge?: boolean;
}
export class Index {
@@ -726,6 +732,7 @@ export class Index {
options?.ngramMinLength,
options?.ngramMaxLength,
options?.prefixOnly,
options?.skipMerge,
),
);
}

View File

@@ -157,6 +157,7 @@ impl Index {
ngram_min_length: Option<u32>,
ngram_max_length: Option<u32>,
prefix_only: Option<bool>,
skip_merge: Option<bool>,
) -> Self {
let mut opts = FtsIndexBuilder::default();
if let Some(with_position) = with_position {
@@ -192,6 +193,9 @@ impl Index {
if let Some(prefix_only) = prefix_only {
opts = opts.ngram_prefix_only(prefix_only);
}
if let Some(skip_merge) = skip_merge {
opts = opts.skip_merge(skip_merge);
}
Self {
inner: Mutex::new(Some(LanceDbIndex::FTS(opts))),

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.27.0-beta.0"
current_version = "0.26.1"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.27.0-beta.0"
version = "0.26.1"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -179,7 +179,6 @@ class Table:
cleanup_since_ms: Optional[int] = None,
delete_unverified: Optional[bool] = None,
) -> OptimizeStats: ...
async def uri(self) -> str: ...
@property
def tags(self) -> Tags: ...
def query(self) -> Query: ...

View File

@@ -127,6 +127,9 @@ class FTS:
ascii_folding : bool, default True
Whether to fold ASCII characters. This converts accented characters to
their ASCII equivalent. For example, "café" would be converted to "cafe".
skip_merge : bool, default False
Whether to skip the partition merge stage after indexing. This can be
useful for distributed indexing where merges are handled separately.
"""
with_position: bool = False
@@ -140,6 +143,7 @@ class FTS:
ngram_min_length: int = 3
ngram_max_length: int = 3
prefix_only: bool = False
skip_merge: bool = False
@dataclass

View File

@@ -176,6 +176,7 @@ class RemoteTable(Table):
ngram_min_length: int = 3,
ngram_max_length: int = 3,
prefix_only: bool = False,
skip_merge: bool = False,
name: Optional[str] = None,
):
config = FTS(
@@ -190,6 +191,7 @@ class RemoteTable(Table):
ngram_min_length=ngram_min_length,
ngram_max_length=ngram_max_length,
prefix_only=prefix_only,
skip_merge=skip_merge,
)
LOOP.run(
self._table.create_index(
@@ -655,14 +657,6 @@ class RemoteTable(Table):
def stats(self):
return LOOP.run(self._table.stats())
@property
def uri(self) -> str:
"""The table URI (storage location).
For remote tables, this fetches the location from the server via describe.
"""
return LOOP.run(self._table.uri())
def take_offsets(self, offsets: list[int]) -> LanceTakeQueryBuilder:
return LanceTakeQueryBuilder(self._table.take_offsets(offsets))

View File

@@ -892,6 +892,7 @@ class Table(ABC):
ngram_min_length: int = 3,
ngram_max_length: int = 3,
prefix_only: bool = False,
skip_merge: bool = False,
wait_timeout: Optional[timedelta] = None,
name: Optional[str] = None,
):
@@ -956,6 +957,9 @@ class Table(ABC):
The maximum length of an n-gram.
prefix_only: bool, default False
Whether to only index the prefix of the token for ngram tokenizer.
skip_merge: bool, default False
Only available with use_tantivy=False.
If True, skip the partition merge stage after indexing.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
name: str, optional
@@ -2218,10 +2222,6 @@ class LanceTable(Table):
def stats(self) -> TableStatistics:
return LOOP.run(self._table.stats())
@property
def uri(self) -> str:
return LOOP.run(self._table.uri())
def create_scalar_index(
self,
column: str,
@@ -2263,6 +2263,7 @@ class LanceTable(Table):
ngram_min_length: int = 3,
ngram_max_length: int = 3,
prefix_only: bool = False,
skip_merge: bool = False,
name: Optional[str] = None,
):
if not use_tantivy:
@@ -2286,6 +2287,8 @@ class LanceTable(Table):
else:
tokenizer_configs = self.infer_tokenizer_configs(tokenizer_name)
tokenizer_configs["skip_merge"] = skip_merge
config = FTS(
**tokenizer_configs,
)
@@ -3610,20 +3613,6 @@ class AsyncTable:
"""
return await self._inner.stats()
async def uri(self) -> str:
"""
Get the table URI (storage location).
For remote tables, this fetches the location from the server via describe.
For local tables, this returns the dataset URI.
Returns
-------
str
The full storage location of the table (e.g., S3/GCS path).
"""
return await self._inner.uri()
async def add(
self,
data: DATA,

View File

@@ -405,7 +405,10 @@ def test_table_create_indices():
# Test create_fts_index with custom name
table.create_fts_index(
"text", wait_timeout=timedelta(seconds=2), name="custom_fts_idx"
"text",
wait_timeout=timedelta(seconds=2),
name="custom_fts_idx",
skip_merge=True,
)
# Test create_index with custom name
@@ -427,6 +430,7 @@ def test_table_create_indices():
fts_req = received_requests[1]
assert "name" in fts_req
assert fts_req["name"] == "custom_fts_idx"
assert fts_req["skip_merge"] is True
# Check vector index request has custom name
vector_req = received_requests[2]

View File

@@ -1967,9 +1967,3 @@ def test_add_table_with_empty_embeddings(tmp_path):
on_bad_vectors="drop",
)
assert table.count_rows() == 1
def test_table_uri(tmp_path):
db = lancedb.connect(tmp_path)
table = db.create_table("my_table", data=[{"x": 0}])
assert table.uri == str(tmp_path / "my_table.lance")

View File

@@ -50,7 +50,8 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
.ascii_folding(params.ascii_folding)
.ngram_min_length(params.ngram_min_length)
.ngram_max_length(params.ngram_max_length)
.ngram_prefix_only(params.prefix_only);
.ngram_prefix_only(params.prefix_only)
.skip_merge(params.skip_merge);
Ok(LanceDbIndex::FTS(inner_opts))
},
"IvfFlat" => {
@@ -179,6 +180,7 @@ struct FtsParams {
ngram_min_length: u32,
ngram_max_length: u32,
prefix_only: bool,
skip_merge: bool,
}
#[derive(FromPyObject)]

View File

@@ -504,11 +504,6 @@ impl Table {
})
}
pub fn uri(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move { inner.uri().await.infer_error() })
}
pub fn __repr__(&self) -> String {
match &self.inner {
None => format!("ClosedTable({})", self.name),

View File

@@ -463,20 +463,9 @@ impl ListingDatabase {
validate_table_name(name)?;
let mut uri = self.uri.clone();
// If the URI does not end with a path separator, add one
// Use forward slash for URIs (http://, s3://, gs://, file://, etc.)
// Use platform-specific separator for local paths without scheme
let has_scheme = uri.contains("://");
let ends_with_separator = uri.ends_with('/') || uri.ends_with('\\');
if !ends_with_separator {
if has_scheme {
// URIs always use forward slash
uri.push('/');
} else {
// Local path without scheme - use platform separator
uri.push(std::path::MAIN_SEPARATOR);
}
// If the URI does not end with a slash, add one
if !uri.ends_with('/') {
uri.push('/');
}
// Append the table name with the lance file extension
uri.push_str(&format!("{}.{}", name, LANCE_FILE_EXTENSION));
@@ -1082,7 +1071,6 @@ mod tests {
use crate::table::{Table, TableDefinition};
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use std::path::PathBuf;
use tempfile::tempdir;
async fn setup_database() -> (tempfile::TempDir, ListingDatabase) {
@@ -2058,19 +2046,6 @@ mod tests {
assert_eq!(db_options.new_table_config.enable_stable_row_ids, None);
}
#[tokio::test]
async fn test_table_uri() {
let (_tempdir, db) = setup_database().await;
let mut pb = PathBuf::new();
pb.push(db.uri.clone());
pb.push("test.lance");
let expected = pb.to_str().unwrap();
let uri = db.table_uri("test").ok().unwrap();
assert_eq!(uri, expected);
}
#[tokio::test]
async fn test_namespace_client() {
let (_tempdir, db) = setup_database().await;

View File

@@ -53,5 +53,8 @@ pub struct LabelListIndexBuilder {}
pub use lance_index::scalar::inverted::query::*;
pub use lance_index::scalar::FullTextSearchQuery;
/// Builder for full text search (FTS) index parameters.
///
/// Use [`FtsIndexBuilder::skip_merge`] to skip the partition merge stage after indexing.
pub use lance_index::scalar::InvertedIndexParams as FtsIndexBuilder;
pub use lance_index::scalar::InvertedIndexParams;

View File

@@ -204,7 +204,6 @@ pub struct RemoteTable<S: HttpSend = Sender> {
server_version: ServerVersion,
version: RwLock<Option<u64>>,
location: RwLock<Option<String>>,
}
impl<S: HttpSend> RemoteTable<S> {
@@ -222,7 +221,6 @@ impl<S: HttpSend> RemoteTable<S> {
identifier,
server_version,
version: RwLock::new(None),
location: RwLock::new(None),
}
}
@@ -641,7 +639,6 @@ impl<S: HttpSend> RemoteTable<S> {
struct TableDescription {
version: u64,
schema: JsonSchema,
location: Option<String>,
}
impl<S: HttpSend> std::fmt::Display for RemoteTable<S> {
@@ -670,7 +667,6 @@ mod test_utils {
identifier: name,
server_version: version.map(ServerVersion).unwrap_or_default(),
version: RwLock::new(None),
location: RwLock::new(None),
}
}
}
@@ -1465,28 +1461,8 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
message: "table_definition is not supported on LanceDB cloud.".into(),
})
}
async fn uri(&self) -> Result<String> {
// Check if we already have the location cached
{
let location = self.location.read().await;
if let Some(ref loc) = *location {
return Ok(loc.clone());
}
}
// Fetch from server via describe
let description = self.describe().await?;
let location = description.location.ok_or_else(|| Error::NotSupported {
message: "Table URI not supported by the server".into(),
})?;
// Cache the location for future use
{
let mut cached_location = self.location.write().await;
*cached_location = Some(location.clone());
}
Ok(location)
fn dataset_uri(&self) -> &str {
"NOT_SUPPORTED"
}
async fn storage_options(&self) -> Option<HashMap<String, String>> {
@@ -2648,6 +2624,11 @@ mod tests {
serde_json::to_value(InvertedIndexParams::default()).unwrap(),
Index::FTS(Default::default()),
),
(
"FTS",
serde_json::to_value(InvertedIndexParams::default().skip_merge(true)).unwrap(),
Index::FTS(InvertedIndexParams::default().skip_merge(true)),
),
];
for (index_type, expected_body, index) in cases {
@@ -3356,69 +3337,4 @@ mod tests {
let result = table.drop_columns(&["old_col1", "old_col2"]).await.unwrap();
assert_eq!(result.version, 5);
}
#[tokio::test]
async fn test_uri() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/describe/");
http::Response::builder()
.status(200)
.body(r#"{"version": 1, "schema": {"fields": []}, "location": "s3://bucket/path/to/table"}"#)
.unwrap()
});
let uri = table.uri().await.unwrap();
assert_eq!(uri, "s3://bucket/path/to/table");
}
#[tokio::test]
async fn test_uri_missing_location() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/describe/");
// Server returns response without location field
http::Response::builder()
.status(200)
.body(r#"{"version": 1, "schema": {"fields": []}}"#)
.unwrap()
});
let result = table.uri().await;
assert!(result.is_err());
assert!(matches!(&result, Err(Error::NotSupported { .. })));
}
#[tokio::test]
async fn test_uri_caching() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let table = Table::new_with_handler("my_table", move |request| {
assert_eq!(request.url().path(), "/v1/table/my_table/describe/");
call_count_clone.fetch_add(1, Ordering::SeqCst);
http::Response::builder()
.status(200)
.body(
r#"{"version": 1, "schema": {"fields": []}, "location": "gs://bucket/table"}"#,
)
.unwrap()
});
// First call should fetch from server
let uri1 = table.uri().await.unwrap();
assert_eq!(uri1, "gs://bucket/table");
assert_eq!(call_count.load(Ordering::SeqCst), 1);
// Second call should use cached value
let uri2 = table.uri().await.unwrap();
assert_eq!(uri2, "gs://bucket/table");
assert_eq!(call_count.load(Ordering::SeqCst), 1); // Still 1, no new call
}
}

View File

@@ -608,8 +608,8 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
async fn list_versions(&self) -> Result<Vec<Version>>;
/// Get the table definition.
async fn table_definition(&self) -> Result<TableDefinition>;
/// Get the table URI (storage location)
async fn uri(&self) -> Result<String>;
/// Get the table URI
fn dataset_uri(&self) -> &str;
/// Get the storage options used when opening this table, if any.
async fn storage_options(&self) -> Option<HashMap<String, String>>;
/// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
@@ -1317,12 +1317,11 @@ impl Table {
self.inner.list_indices().await
}
/// Get the table URI (storage location)
/// Get the underlying dataset URI
///
/// Returns the full storage location of the table (e.g., S3/GCS path).
/// For remote tables, this fetches the location from the server via describe.
pub async fn uri(&self) -> Result<String> {
self.inner.uri().await
/// Warning: This is an internal API and the return value is subject to change.
pub fn dataset_uri(&self) -> &str {
self.inner.dataset_uri()
}
/// Get the storage options used when opening this table, if any.
@@ -3235,8 +3234,8 @@ impl BaseTable for NativeTable {
Ok(results.into_iter().flatten().collect())
}
async fn uri(&self) -> Result<String> {
Ok(self.uri.clone())
fn dataset_uri(&self) -> &str {
self.uri.as_str()
}
async fn storage_options(&self) -> Option<HashMap<String, String>> {