feat!: upgrade lance to 0.16 (#1519)

This commit is contained in:
Lei Xu
2024-08-07 13:15:22 -07:00
committed by GitHub
parent 32123713fd
commit 2bdf0a02f9
16 changed files with 153 additions and 75 deletions

View File

@@ -20,20 +20,21 @@ keywords = ["lancedb", "lance", "database", "vector", "search"]
categories = ["database-implementations"]
[workspace.dependencies]
lance = { "version" = "=0.15.0", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.15.0" }
lance-linalg = { "version" = "=0.15.0" }
lance-testing = { "version" = "=0.15.0" }
lance-datafusion = { "version" = "=0.15.0" }
lance = { "version" = "=0.16.0", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.16.0" }
lance-linalg = { "version" = "=0.16.0" }
lance-testing = { "version" = "=0.16.0" }
lance-datafusion = { "version" = "=0.16.0" }
lance-encoding = { "version" = "=0.16.0" }
# Note that this one does not include pyarrow
arrow = { version = "52.1", optional = false }
arrow-array = "52.1"
arrow-data = "52.1"
arrow-ipc = "52.1"
arrow-ord = "52.1"
arrow-schema = "52.1"
arrow-arith = "52.1"
arrow-cast = "52.1"
arrow = { version = "52.2", optional = false }
arrow-array = "52.2"
arrow-data = "52.2"
arrow-ipc = "52.2"
arrow-ord = "52.2"
arrow-schema = "52.2"
arrow-arith = "52.2"
arrow-cast = "52.2"
async-trait = "0"
chrono = "0.4.35"
datafusion-physical-plan = "40.0"

View File

@@ -20,7 +20,6 @@ napi = { version = "2.16.8", default-features = false, features = [
"async",
] }
napi-derive = "2.16.4"
# Prevent dynamic linking of lzma, which comes from datafusion
lzma-sys = { version = "*", features = ["static"] }

View File

@@ -44,10 +44,20 @@ export interface CreateTableOptions {
* The available options are described at https://lancedb.github.io/lancedb/guides/storage/
*/
storageOptions?: Record<string, string>;
/**
* The version of the data storage format to use.
*
* The default is `legacy`, which is Lance format v1.
* `stable` is the new format, which is Lance format v2.
*/
dataStorageVersion?: string;
/**
* If true then data files will be written with the legacy format
*
* The default is true while the new format is in beta
*
* Deprecated.
*/
useLegacyFormat?: boolean;
schema?: SchemaLike;
@@ -247,12 +257,19 @@ export class LocalConnection extends Connection {
throw new Error("data is required");
}
const { buf, mode } = await Table.parseTableData(data, options);
let dataStorageVersion = "legacy";
if (options?.dataStorageVersion !== undefined) {
dataStorageVersion = options.dataStorageVersion;
} else if (options?.useLegacyFormat !== undefined) {
dataStorageVersion = options.useLegacyFormat ? "legacy" : "stable";
}
const innerTable = await this.inner.createTable(
nameOrOptions,
buf,
mode,
cleanseStorageOptions(options?.storageOptions),
options?.useLegacyFormat,
dataStorageVersion,
);
return new LocalTable(innerTable);
@@ -276,6 +293,13 @@ export class LocalConnection extends Connection {
metadata = registry.getTableMetadata([embeddingFunction]);
}
let dataStorageVersion = "legacy";
if (options?.dataStorageVersion !== undefined) {
dataStorageVersion = options.dataStorageVersion;
} else if (options?.useLegacyFormat !== undefined) {
dataStorageVersion = options.useLegacyFormat ? "legacy" : "stable";
}
const table = makeEmptyTable(schema, metadata);
const buf = await fromTableToBuffer(table);
const innerTable = await this.inner.createEmptyTable(
@@ -283,7 +307,7 @@ export class LocalConnection extends Connection {
buf,
mode,
cleanseStorageOptions(options?.storageOptions),
options?.useLegacyFormat,
dataStorageVersion,
);
return new LocalTable(innerTable);
}

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.7.2",
"version": "0.8.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.7.2",
"version": "0.8.0",
"cpu": [
"x64",
"arm64"

View File

@@ -13,13 +13,16 @@
// limitations under the License.
use std::collections::HashMap;
use std::str::FromStr;
use napi::bindgen_prelude::*;
use napi_derive::*;
use crate::table::Table;
use crate::ConnectionOptions;
use lancedb::connection::{ConnectBuilder, Connection as LanceDBConnection, CreateTableMode};
use lancedb::connection::{
ConnectBuilder, Connection as LanceDBConnection, CreateTableMode, LanceFileVersion,
};
use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema};
#[napi]
@@ -120,7 +123,7 @@ impl Connection {
buf: Buffer,
mode: String,
storage_options: Option<HashMap<String, String>>,
use_legacy_format: Option<bool>,
data_storage_options: Option<String>,
) -> napi::Result<Table> {
let batches = ipc_file_to_batches(buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
@@ -131,8 +134,11 @@ impl Connection {
builder = builder.storage_option(key, value);
}
}
if let Some(use_legacy_format) = use_legacy_format {
builder = builder.use_legacy_format(use_legacy_format);
if let Some(data_storage_option) = data_storage_options.as_ref() {
builder = builder.data_storage_version(
LanceFileVersion::from_str(data_storage_option)
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?,
);
}
let tbl = builder
.execute()
@@ -148,7 +154,7 @@ impl Connection {
schema_buf: Buffer,
mode: String,
storage_options: Option<HashMap<String, String>>,
use_legacy_format: Option<bool>,
data_storage_options: Option<String>,
) -> napi::Result<Table> {
let schema = ipc_file_to_schema(schema_buf.to_vec()).map_err(|e| {
napi::Error::from_reason(format!("Failed to marshal schema from JS to Rust: {}", e))
@@ -163,8 +169,11 @@ impl Connection {
builder = builder.storage_option(key, value);
}
}
if let Some(use_legacy_format) = use_legacy_format {
builder = builder.use_legacy_format(use_legacy_format);
if let Some(data_storage_option) = data_storage_options.as_ref() {
builder = builder.data_storage_version(
LanceFileVersion::from_str(data_storage_option)
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?,
);
}
let tbl = builder
.execute()

View File

@@ -293,6 +293,7 @@ impl Table {
.optimize(OptimizeAction::Prune {
older_than,
delete_unverified: None,
error_if_tagged_old_versions: None,
})
.await
.default_error()?

View File

@@ -3,7 +3,7 @@ name = "lancedb"
# version in Cargo.toml
dependencies = [
"deprecation",
"pylance==0.15.0",
"pylance==0.16.0",
"ratelimiter~=1.0",
"requests>=2.31.0",
"retry>=0.9.2",

View File

@@ -24,7 +24,7 @@ class Connection(object):
mode: str,
data: pa.RecordBatchReader,
storage_options: Optional[Dict[str, str]] = None,
use_legacy_format: Optional[bool] = None,
data_storage_version: Optional[str] = None,
) -> Table: ...
async def create_empty_table(
self,
@@ -32,7 +32,7 @@ class Connection(object):
mode: str,
schema: pa.Schema,
storage_options: Optional[Dict[str, str]] = None,
use_legacy_format: Optional[bool] = None,
data_storage_version: Optional[str] = None,
) -> Table: ...
class Table:

View File

@@ -560,6 +560,7 @@ class AsyncConnection(object):
fill_value: Optional[float] = None,
storage_options: Optional[Dict[str, str]] = None,
*,
data_storage_version: Optional[str] = None,
use_legacy_format: Optional[bool] = None,
) -> AsyncTable:
"""Create an [AsyncTable][lancedb.table.AsyncTable] in the database.
@@ -603,9 +604,15 @@ class AsyncConnection(object):
connection will be inherited by the table, but can be overridden here.
See available options at
https://lancedb.github.io/lancedb/guides/storage/
use_legacy_format: bool, optional, default True
data_storage_version: optional, str, default "legacy"
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default is
"legacy" which will use the legacy v1 version. See the user guide
for more details.
use_legacy_format: bool, optional, default True. (Deprecated)
If True, use the legacy format for the table. If False, use the new format.
The default is True while the new format is in beta.
This method is deprecated, use `data_storage_version` instead.
Returns
@@ -765,13 +772,18 @@ class AsyncConnection(object):
if mode == "create" and exist_ok:
mode = "exist_ok"
if not data_storage_version:
data_storage_version = (
"legacy" if use_legacy_format is None or use_legacy_format else "stable"
)
if data is None:
new_table = await self._inner.create_empty_table(
name,
mode,
schema,
storage_options=storage_options,
use_legacy_format=use_legacy_format,
data_storage_version=data_storage_version,
)
else:
data = data_to_reader(data, schema)
@@ -780,7 +792,7 @@ class AsyncConnection(object):
mode,
data,
storage_options=storage_options,
use_legacy_format=use_legacy_format,
data_storage_version=data_storage_version,
)
return AsyncTable(new_table)

View File

@@ -730,7 +730,7 @@ def test_create_scalar_index(db):
indices = table.to_lance().list_indices()
assert len(indices) == 1
scalar_index = indices[0]
assert scalar_index["type"] == "Scalar"
assert scalar_index["type"] == "BTree"
# Confirm that prefiltering still works with the scalar index column
results = table.search().where("x = 'c'").to_arrow()

View File

@@ -1,21 +1,10 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::FromPyArrow};
use lancedb::connection::{Connection as LanceConnection, CreateTableMode};
use lancedb::connection::{Connection as LanceConnection, CreateTableMode, LanceFileVersion};
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
pyclass, pyfunction, pymethods, Bound, PyAny, PyRef, PyResult, Python,
@@ -91,7 +80,7 @@ impl Connection {
mode: &str,
data: Bound<'_, PyAny>,
storage_options: Option<HashMap<String, String>>,
use_legacy_format: Option<bool>,
data_storage_version: Option<String>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.get_inner()?.clone();
@@ -104,8 +93,11 @@ impl Connection {
builder = builder.storage_options(storage_options);
}
if let Some(use_legacy_format) = use_legacy_format {
builder = builder.use_legacy_format(use_legacy_format);
if let Some(data_storage_version) = data_storage_version.as_ref() {
builder = builder.data_storage_version(
LanceFileVersion::from_str(data_storage_version)
.map_err(|e| PyValueError::new_err(e.to_string()))?,
);
}
future_into_py(self_.py(), async move {
@@ -120,7 +112,7 @@ impl Connection {
mode: &str,
schema: Bound<'_, PyAny>,
storage_options: Option<HashMap<String, String>>,
use_legacy_format: Option<bool>,
data_storage_version: Option<String>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.get_inner()?.clone();
@@ -134,8 +126,11 @@ impl Connection {
builder = builder.storage_options(storage_options);
}
if let Some(use_legacy_format) = use_legacy_format {
builder = builder.use_legacy_format(use_legacy_format);
if let Some(data_storage_version) = data_storage_version.as_ref() {
builder = builder.data_storage_version(
LanceFileVersion::from_str(data_storage_version)
.map_err(|e| PyValueError::new_err(e.to_string()))?,
);
}
future_into_py(self_.py(), async move {

View File

@@ -63,7 +63,10 @@ pub struct Table {
#[pymethods]
impl OptimizeStats {
pub fn __repr__(&self) -> String {
format!("OptimizeStats(compaction={:?}, prune={:?})", self.compaction, self.prune)
format!(
"OptimizeStats(compaction={:?}, prune={:?})",
self.compaction, self.prune
)
}
}
@@ -273,6 +276,7 @@ impl Table {
.optimize(OptimizeAction::Prune {
older_than,
delete_unverified: None,
error_if_tagged_old_versions: None,
})
.await
.infer_error()?

View File

@@ -320,12 +320,19 @@ impl JsTable {
.map(|val| val.value(&mut cx))
.unwrap_or_default(),
);
let error_if_tagged_old_versions: Option<bool> = Some(
cx.argument_opt(2)
.and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok())
.map(|val| val.value(&mut cx))
.unwrap_or_default(),
);
rt.spawn(async move {
let stats = table
.optimize(OptimizeAction::Prune {
older_than: Some(older_than),
delete_unverified,
error_if_tagged_old_versions,
})
.await;

View File

@@ -29,6 +29,7 @@ lance-datafusion.workspace = true
lance-index = { workspace = true }
lance-linalg = { workspace = true }
lance-testing = { workspace = true }
lance-encoding = { workspace = true }
pin-project = { workspace = true }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
log.workspace = true
@@ -46,11 +47,11 @@ serde_with = { version = "3.8.1" }
reqwest = { version = "0.11.24", features = ["gzip", "json"], optional = true }
polars-arrow = { version = ">=0.37,<0.40.0", optional = true }
polars = { version = ">=0.37,<0.40.0", optional = true }
hf-hub = {version = "0.3.2", optional = true}
hf-hub = { version = "0.3.2", optional = true }
candle-core = { version = "0.6.0", optional = true }
candle-transformers = { version = "0.6.0", optional = true }
candle-nn = { version = "0.6.0", optional = true }
tokenizers = { version = "0.19.1", optional = true}
tokenizers = { version = "0.19.1", optional = true }
[dev-dependencies]
tempfile = "3.5.0"
@@ -70,7 +71,13 @@ fp16kernels = ["lance-linalg/fp16kernels"]
s3-test = []
openai = ["dep:async-openai", "dep:reqwest"]
polars = ["dep:polars-arrow", "dep:polars"]
sentence-transformers = ["dep:hf-hub", "dep:candle-core", "dep:candle-transformers", "dep:candle-nn", "dep:tokenizers"]
sentence-transformers = [
"dep:hf-hub",
"dep:candle-core",
"dep:candle-transformers",
"dep:candle-nn",
"dep:tokenizers"
]
[[example]]
name = "openai"

View File

@@ -22,7 +22,7 @@ use std::sync::Arc;
use arrow_array::{RecordBatchIterator, RecordBatchReader};
use arrow_schema::SchemaRef;
use lance::dataset::{ReadParams, WriteMode};
use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore};
use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry, WrappingObjectStore};
use object_store::{aws::AwsCredential, local::LocalFileSystem};
use snafu::prelude::*;
@@ -35,6 +35,7 @@ use crate::io::object_store::MirroringObjectStoreWrapper;
use crate::table::{NativeTable, TableDefinition, WriteOptions};
use crate::utils::validate_table_name;
use crate::Table;
pub use lance_encoding::version::LanceFileVersion;
#[cfg(feature = "remote")]
use log::warn;
@@ -140,7 +141,7 @@ pub struct CreateTableBuilder<const HAS_DATA: bool, T: IntoArrow> {
pub(crate) write_options: WriteOptions,
pub(crate) table_definition: Option<TableDefinition>,
pub(crate) embeddings: Vec<(EmbeddingDefinition, Arc<dyn EmbeddingFunction>)>,
pub(crate) use_legacy_format: bool,
pub(crate) data_storage_version: Option<LanceFileVersion>,
}
// Builder methods that only apply when we have initial data
@@ -154,7 +155,7 @@ impl<T: IntoArrow> CreateTableBuilder<true, T> {
write_options: WriteOptions::default(),
table_definition: None,
embeddings: Vec::new(),
use_legacy_format: true,
data_storage_version: None,
}
}
@@ -186,7 +187,7 @@ impl<T: IntoArrow> CreateTableBuilder<true, T> {
mode: self.mode,
write_options: self.write_options,
embeddings: self.embeddings,
use_legacy_format: self.use_legacy_format,
data_storage_version: self.data_storage_version,
};
Ok((data, builder))
}
@@ -220,7 +221,7 @@ impl CreateTableBuilder<false, NoData> {
mode: CreateTableMode::default(),
write_options: WriteOptions::default(),
embeddings: Vec::new(),
use_legacy_format: true,
data_storage_version: None,
}
}
@@ -283,6 +284,14 @@ impl<const HAS_DATA: bool, T: IntoArrow> CreateTableBuilder<HAS_DATA, T> {
self
}
/// Set the data storage version.
///
/// The default is `LanceFileVersion::Legacy`.
pub fn data_storage_version(mut self, data_storage_version: LanceFileVersion) -> Self {
self.data_storage_version = Some(data_storage_version);
self
}
/// Set to true to use the v1 format for data files
///
/// This is currently defaulted to true and can be set to false to opt-in
@@ -292,8 +301,13 @@ impl<const HAS_DATA: bool, T: IntoArrow> CreateTableBuilder<HAS_DATA, T> {
///
/// Once the new format is stable, the default will change to `false` for
/// several releases and then eventually this option will be removed.
#[deprecated(since = "0.9.0", note = "use data_storage_version instead")]
pub fn use_legacy_format(mut self, use_legacy_format: bool) -> Self {
self.use_legacy_format = use_legacy_format;
self.data_storage_version = if use_legacy_format {
Some(LanceFileVersion::Legacy)
} else {
Some(LanceFileVersion::Stable)
};
self
}
}
@@ -789,13 +803,14 @@ impl Database {
let plain_uri = url.to_string();
let registry = Arc::new(ObjectStoreRegistry::default());
let storage_options = options.storage_options.clone();
let os_params = ObjectStoreParams {
storage_options: Some(storage_options.clone()),
..Default::default()
};
let (object_store, base_path) =
ObjectStore::from_uri_and_params(&plain_uri, &os_params).await?;
ObjectStore::from_uri_and_params(registry, &plain_uri, &os_params).await?;
if object_store.is_local() {
Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?;
}
@@ -961,7 +976,7 @@ impl ConnectionInternal for Database {
if matches!(&options.mode, CreateTableMode::Overwrite) {
write_params.mode = WriteMode::Overwrite;
}
write_params.use_legacy_format = options.use_legacy_format;
write_params.data_storage_version = options.data_storage_version;
match NativeTable::create(
&table_uri,

View File

@@ -191,6 +191,8 @@ pub enum OptimizeAction {
/// Because they may be part of an in-progress transaction, files newer than 7 days old are not deleted by default.
/// If you are sure that there are no in-progress transactions, then you can set this to True to delete all files older than `older_than`.
delete_unverified: Option<bool>,
/// If true, an error will be returned if there are any old versions that are still tagged.
error_if_tagged_old_versions: Option<bool>,
},
/// Optimize the indices
///
@@ -1079,8 +1081,8 @@ impl NativeTable {
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
// Default params uses format v1.
let params = params.unwrap_or(WriteParams {
use_legacy_format: true,
..Default::default()
});
// patch the params if we have a write store wrapper
@@ -1173,12 +1175,13 @@ impl NativeTable {
&self,
older_than: Duration,
delete_unverified: Option<bool>,
error_if_tagged_old_versions: Option<bool>,
) -> Result<RemovalStats> {
Ok(self
.dataset
.get_mut()
.await?
.cleanup_old_versions(older_than, delete_unverified)
.cleanup_old_versions(older_than, delete_unverified, error_if_tagged_old_versions)
.await?)
}
@@ -1506,8 +1509,8 @@ impl NativeTable {
}
let mut dataset = self.dataset.get_mut().await?;
let lance_idx_params = lance::index::scalar::ScalarIndexParams {
force_index_type: Some(lance::index::scalar::ScalarIndexType::BTree),
let lance_idx_params = lance_index::scalar::ScalarIndexParams {
force_index_type: Some(lance_index::scalar::ScalarIndexType::BTree),
};
dataset
.create_index(
@@ -1607,6 +1610,9 @@ impl TableInternal for NativeTable {
let data =
MaybeEmbedded::try_new(data, self.table_definition().await?, add.embedding_registry)?;
// Still use the legacy lance format (v1) by default.
// We don't want to accidentally switch to v2 format during an add operation.
// If the table is already v2 this won't have any effect.
let mut lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams {
mode: match add.mode {
AddDataMode::Append => WriteMode::Append,
@@ -1628,16 +1634,11 @@ impl TableInternal for NativeTable {
}
// patch the params if we have a write store wrapper
let mut lance_params = match self.store_wrapper.clone() {
let lance_params = match self.store_wrapper.clone() {
Some(wrapper) => lance_params.patch_with_store_wrapper(wrapper)?,
None => lance_params,
};
// Only use the new format if the user passes use_legacy_format=False in while creating
// a table with data. We don't want to accidentally switch to v2 format during an add
// operation. If the table is already v2 this won't have any effect.
lance_params.use_legacy_format = true;
self.dataset.ensure_mutable().await?;
let dataset = Dataset::write(data, &self.uri, Some(lance_params)).await?;
@@ -1878,6 +1879,7 @@ impl TableInternal for NativeTable {
.optimize(OptimizeAction::Prune {
older_than: None,
delete_unverified: None,
error_if_tagged_old_versions: None,
})
.await?
.prune;
@@ -1893,11 +1895,13 @@ impl TableInternal for NativeTable {
OptimizeAction::Prune {
older_than,
delete_unverified,
error_if_tagged_old_versions,
} => {
stats.prune = Some(
self.cleanup_old_versions(
older_than.unwrap_or(Duration::try_days(7).expect("valid delta")),
delete_unverified,
error_if_tagged_old_versions,
)
.await?,
);