diff --git a/Cargo.toml b/Cargo.toml index 51a87245..50120900 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,20 +20,21 @@ keywords = ["lancedb", "lance", "database", "vector", "search"] categories = ["database-implementations"] [workspace.dependencies] -lance = { "version" = "=0.15.0", "features" = ["dynamodb"] } -lance-index = { "version" = "=0.15.0" } -lance-linalg = { "version" = "=0.15.0" } -lance-testing = { "version" = "=0.15.0" } -lance-datafusion = { "version" = "=0.15.0" } +lance = { "version" = "=0.16.0", "features" = ["dynamodb"] } +lance-index = { "version" = "=0.16.0" } +lance-linalg = { "version" = "=0.16.0" } +lance-testing = { "version" = "=0.16.0" } +lance-datafusion = { "version" = "=0.16.0" } +lance-encoding = { "version" = "=0.16.0" } # Note that this one does not include pyarrow -arrow = { version = "52.1", optional = false } -arrow-array = "52.1" -arrow-data = "52.1" -arrow-ipc = "52.1" -arrow-ord = "52.1" -arrow-schema = "52.1" -arrow-arith = "52.1" -arrow-cast = "52.1" +arrow = { version = "52.2", optional = false } +arrow-array = "52.2" +arrow-data = "52.2" +arrow-ipc = "52.2" +arrow-ord = "52.2" +arrow-schema = "52.2" +arrow-arith = "52.2" +arrow-cast = "52.2" async-trait = "0" chrono = "0.4.35" datafusion-physical-plan = "40.0" diff --git a/nodejs/Cargo.toml b/nodejs/Cargo.toml index da304dbe..b4226e9f 100644 --- a/nodejs/Cargo.toml +++ b/nodejs/Cargo.toml @@ -20,7 +20,6 @@ napi = { version = "2.16.8", default-features = false, features = [ "async", ] } napi-derive = "2.16.4" - # Prevent dynamic linking of lzma, which comes from datafusion lzma-sys = { version = "*", features = ["static"] } diff --git a/nodejs/lancedb/connection.ts b/nodejs/lancedb/connection.ts index 624a8f0a..c0ef9540 100644 --- a/nodejs/lancedb/connection.ts +++ b/nodejs/lancedb/connection.ts @@ -44,10 +44,20 @@ export interface CreateTableOptions { * The available options are described at https://lancedb.github.io/lancedb/guides/storage/ */ storageOptions?: Record; + /** + * The version of the data storage format to use. + * + * The default is `legacy`, which is Lance format v1. + * `stable` is the new format, which is Lance format v2. + */ + dataStorageVersion?: string; + /** * If true then data files will be written with the legacy format * * The default is true while the new format is in beta + * + * Deprecated. */ useLegacyFormat?: boolean; schema?: SchemaLike; @@ -247,12 +257,19 @@ export class LocalConnection extends Connection { throw new Error("data is required"); } const { buf, mode } = await Table.parseTableData(data, options); + let dataStorageVersion = "legacy"; + if (options?.dataStorageVersion !== undefined) { + dataStorageVersion = options.dataStorageVersion; + } else if (options?.useLegacyFormat !== undefined) { + dataStorageVersion = options.useLegacyFormat ? "legacy" : "stable"; + } + const innerTable = await this.inner.createTable( nameOrOptions, buf, mode, cleanseStorageOptions(options?.storageOptions), - options?.useLegacyFormat, + dataStorageVersion, ); return new LocalTable(innerTable); @@ -276,6 +293,13 @@ export class LocalConnection extends Connection { metadata = registry.getTableMetadata([embeddingFunction]); } + let dataStorageVersion = "legacy"; + if (options?.dataStorageVersion !== undefined) { + dataStorageVersion = options.dataStorageVersion; + } else if (options?.useLegacyFormat !== undefined) { + dataStorageVersion = options.useLegacyFormat ? "legacy" : "stable"; + } + const table = makeEmptyTable(schema, metadata); const buf = await fromTableToBuffer(table); const innerTable = await this.inner.createEmptyTable( @@ -283,7 +307,7 @@ export class LocalConnection extends Connection { buf, mode, cleanseStorageOptions(options?.storageOptions), - options?.useLegacyFormat, + dataStorageVersion, ); return new LocalTable(innerTable); } diff --git a/nodejs/package-lock.json b/nodejs/package-lock.json index 6f3711a8..d2ba91a8 100644 --- a/nodejs/package-lock.json +++ b/nodejs/package-lock.json @@ -1,12 +1,12 @@ { "name": "@lancedb/lancedb", - "version": "0.7.2", + "version": "0.8.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@lancedb/lancedb", - "version": "0.7.2", + "version": "0.8.0", "cpu": [ "x64", "arm64" diff --git a/nodejs/src/connection.rs b/nodejs/src/connection.rs index c19453db..e0510ab3 100644 --- a/nodejs/src/connection.rs +++ b/nodejs/src/connection.rs @@ -13,13 +13,16 @@ // limitations under the License. use std::collections::HashMap; +use std::str::FromStr; use napi::bindgen_prelude::*; use napi_derive::*; use crate::table::Table; use crate::ConnectionOptions; -use lancedb::connection::{ConnectBuilder, Connection as LanceDBConnection, CreateTableMode}; +use lancedb::connection::{ + ConnectBuilder, Connection as LanceDBConnection, CreateTableMode, LanceFileVersion, +}; use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema}; #[napi] @@ -120,7 +123,7 @@ impl Connection { buf: Buffer, mode: String, storage_options: Option>, - use_legacy_format: Option, + data_storage_options: Option, ) -> napi::Result { let batches = ipc_file_to_batches(buf.to_vec()) .map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?; @@ -131,8 +134,11 @@ impl Connection { builder = builder.storage_option(key, value); } } - if let Some(use_legacy_format) = use_legacy_format { - builder = builder.use_legacy_format(use_legacy_format); + if let Some(data_storage_option) = data_storage_options.as_ref() { + builder = builder.data_storage_version( + LanceFileVersion::from_str(data_storage_option) + .map_err(|e| napi::Error::from_reason(format!("{}", e)))?, + ); } let tbl = builder .execute() @@ -148,7 +154,7 @@ impl Connection { schema_buf: Buffer, mode: String, storage_options: Option>, - use_legacy_format: Option, + data_storage_options: Option, ) -> napi::Result
{ let schema = ipc_file_to_schema(schema_buf.to_vec()).map_err(|e| { napi::Error::from_reason(format!("Failed to marshal schema from JS to Rust: {}", e)) @@ -163,8 +169,11 @@ impl Connection { builder = builder.storage_option(key, value); } } - if let Some(use_legacy_format) = use_legacy_format { - builder = builder.use_legacy_format(use_legacy_format); + if let Some(data_storage_option) = data_storage_options.as_ref() { + builder = builder.data_storage_version( + LanceFileVersion::from_str(data_storage_option) + .map_err(|e| napi::Error::from_reason(format!("{}", e)))?, + ); } let tbl = builder .execute() diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index 1817abf6..90925d1d 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -293,6 +293,7 @@ impl Table { .optimize(OptimizeAction::Prune { older_than, delete_unverified: None, + error_if_tagged_old_versions: None, }) .await .default_error()? diff --git a/python/pyproject.toml b/python/pyproject.toml index 1a87323d..afb4ce60 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -3,7 +3,7 @@ name = "lancedb" # version in Cargo.toml dependencies = [ "deprecation", - "pylance==0.15.0", + "pylance==0.16.0", "ratelimiter~=1.0", "requests>=2.31.0", "retry>=0.9.2", diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index f930087d..a143e308 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -24,7 +24,7 @@ class Connection(object): mode: str, data: pa.RecordBatchReader, storage_options: Optional[Dict[str, str]] = None, - use_legacy_format: Optional[bool] = None, + data_storage_version: Optional[str] = None, ) -> Table: ... async def create_empty_table( self, @@ -32,7 +32,7 @@ class Connection(object): mode: str, schema: pa.Schema, storage_options: Optional[Dict[str, str]] = None, - use_legacy_format: Optional[bool] = None, + data_storage_version: Optional[str] = None, ) -> Table: ... class Table: diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index 62be2042..50046080 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -560,6 +560,7 @@ class AsyncConnection(object): fill_value: Optional[float] = None, storage_options: Optional[Dict[str, str]] = None, *, + data_storage_version: Optional[str] = None, use_legacy_format: Optional[bool] = None, ) -> AsyncTable: """Create an [AsyncTable][lancedb.table.AsyncTable] in the database. @@ -603,9 +604,15 @@ class AsyncConnection(object): connection will be inherited by the table, but can be overridden here. See available options at https://lancedb.github.io/lancedb/guides/storage/ - use_legacy_format: bool, optional, default True + data_storage_version: optional, str, default "legacy" + The version of the data storage format to use. Newer versions are more + efficient but require newer versions of lance to read. The default is + "legacy" which will use the legacy v1 version. See the user guide + for more details. + use_legacy_format: bool, optional, default True. (Deprecated) If True, use the legacy format for the table. If False, use the new format. The default is True while the new format is in beta. + This method is deprecated, use `data_storage_version` instead. Returns @@ -765,13 +772,18 @@ class AsyncConnection(object): if mode == "create" and exist_ok: mode = "exist_ok" + if not data_storage_version: + data_storage_version = ( + "legacy" if use_legacy_format is None or use_legacy_format else "stable" + ) + if data is None: new_table = await self._inner.create_empty_table( name, mode, schema, storage_options=storage_options, - use_legacy_format=use_legacy_format, + data_storage_version=data_storage_version, ) else: data = data_to_reader(data, schema) @@ -780,7 +792,7 @@ class AsyncConnection(object): mode, data, storage_options=storage_options, - use_legacy_format=use_legacy_format, + data_storage_version=data_storage_version, ) return AsyncTable(new_table) diff --git a/python/python/tests/test_table.py b/python/python/tests/test_table.py index dac73570..832fa76e 100644 --- a/python/python/tests/test_table.py +++ b/python/python/tests/test_table.py @@ -730,7 +730,7 @@ def test_create_scalar_index(db): indices = table.to_lance().list_indices() assert len(indices) == 1 scalar_index = indices[0] - assert scalar_index["type"] == "Scalar" + assert scalar_index["type"] == "BTree" # Confirm that prefiltering still works with the scalar index column results = table.search().where("x = 'c'").to_arrow() diff --git a/python/src/connection.rs b/python/src/connection.rs index 815ae5db..43c8c6ca 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -1,21 +1,10 @@ -// Copyright 2024 Lance Developers. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The LanceDB Authors -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::FromPyArrow}; -use lancedb::connection::{Connection as LanceConnection, CreateTableMode}; +use lancedb::connection::{Connection as LanceConnection, CreateTableMode, LanceFileVersion}; use pyo3::{ exceptions::{PyRuntimeError, PyValueError}, pyclass, pyfunction, pymethods, Bound, PyAny, PyRef, PyResult, Python, @@ -91,7 +80,7 @@ impl Connection { mode: &str, data: Bound<'_, PyAny>, storage_options: Option>, - use_legacy_format: Option, + data_storage_version: Option, ) -> PyResult> { let inner = self_.get_inner()?.clone(); @@ -104,8 +93,11 @@ impl Connection { builder = builder.storage_options(storage_options); } - if let Some(use_legacy_format) = use_legacy_format { - builder = builder.use_legacy_format(use_legacy_format); + if let Some(data_storage_version) = data_storage_version.as_ref() { + builder = builder.data_storage_version( + LanceFileVersion::from_str(data_storage_version) + .map_err(|e| PyValueError::new_err(e.to_string()))?, + ); } future_into_py(self_.py(), async move { @@ -120,7 +112,7 @@ impl Connection { mode: &str, schema: Bound<'_, PyAny>, storage_options: Option>, - use_legacy_format: Option, + data_storage_version: Option, ) -> PyResult> { let inner = self_.get_inner()?.clone(); @@ -134,8 +126,11 @@ impl Connection { builder = builder.storage_options(storage_options); } - if let Some(use_legacy_format) = use_legacy_format { - builder = builder.use_legacy_format(use_legacy_format); + if let Some(data_storage_version) = data_storage_version.as_ref() { + builder = builder.data_storage_version( + LanceFileVersion::from_str(data_storage_version) + .map_err(|e| PyValueError::new_err(e.to_string()))?, + ); } future_into_py(self_.py(), async move { diff --git a/python/src/table.rs b/python/src/table.rs index 443591ca..497b0ca2 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -63,7 +63,10 @@ pub struct Table { #[pymethods] impl OptimizeStats { pub fn __repr__(&self) -> String { - format!("OptimizeStats(compaction={:?}, prune={:?})", self.compaction, self.prune) + format!( + "OptimizeStats(compaction={:?}, prune={:?})", + self.compaction, self.prune + ) } } @@ -273,6 +276,7 @@ impl Table { .optimize(OptimizeAction::Prune { older_than, delete_unverified: None, + error_if_tagged_old_versions: None, }) .await .infer_error()? diff --git a/rust/ffi/node/src/table.rs b/rust/ffi/node/src/table.rs index b47af535..10e7f19b 100644 --- a/rust/ffi/node/src/table.rs +++ b/rust/ffi/node/src/table.rs @@ -320,12 +320,19 @@ impl JsTable { .map(|val| val.value(&mut cx)) .unwrap_or_default(), ); + let error_if_tagged_old_versions: Option = Some( + cx.argument_opt(2) + .and_then(|val| val.downcast::(&mut cx).ok()) + .map(|val| val.value(&mut cx)) + .unwrap_or_default(), + ); rt.spawn(async move { let stats = table .optimize(OptimizeAction::Prune { older_than: Some(older_than), delete_unverified, + error_if_tagged_old_versions, }) .await; diff --git a/rust/lancedb/Cargo.toml b/rust/lancedb/Cargo.toml index ccce9558..a072a18e 100644 --- a/rust/lancedb/Cargo.toml +++ b/rust/lancedb/Cargo.toml @@ -29,6 +29,7 @@ lance-datafusion.workspace = true lance-index = { workspace = true } lance-linalg = { workspace = true } lance-testing = { workspace = true } +lance-encoding = { workspace = true } pin-project = { workspace = true } tokio = { version = "1.23", features = ["rt-multi-thread"] } log.workspace = true @@ -46,11 +47,11 @@ serde_with = { version = "3.8.1" } reqwest = { version = "0.11.24", features = ["gzip", "json"], optional = true } polars-arrow = { version = ">=0.37,<0.40.0", optional = true } polars = { version = ">=0.37,<0.40.0", optional = true } -hf-hub = {version = "0.3.2", optional = true} +hf-hub = { version = "0.3.2", optional = true } candle-core = { version = "0.6.0", optional = true } candle-transformers = { version = "0.6.0", optional = true } candle-nn = { version = "0.6.0", optional = true } -tokenizers = { version = "0.19.1", optional = true} +tokenizers = { version = "0.19.1", optional = true } [dev-dependencies] tempfile = "3.5.0" @@ -70,7 +71,13 @@ fp16kernels = ["lance-linalg/fp16kernels"] s3-test = [] openai = ["dep:async-openai", "dep:reqwest"] polars = ["dep:polars-arrow", "dep:polars"] -sentence-transformers = ["dep:hf-hub", "dep:candle-core", "dep:candle-transformers", "dep:candle-nn", "dep:tokenizers"] +sentence-transformers = [ + "dep:hf-hub", + "dep:candle-core", + "dep:candle-transformers", + "dep:candle-nn", + "dep:tokenizers" +] [[example]] name = "openai" diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 955d9a56..700446ec 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use arrow_array::{RecordBatchIterator, RecordBatchReader}; use arrow_schema::SchemaRef; use lance::dataset::{ReadParams, WriteMode}; -use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore}; +use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry, WrappingObjectStore}; use object_store::{aws::AwsCredential, local::LocalFileSystem}; use snafu::prelude::*; @@ -35,6 +35,7 @@ use crate::io::object_store::MirroringObjectStoreWrapper; use crate::table::{NativeTable, TableDefinition, WriteOptions}; use crate::utils::validate_table_name; use crate::Table; +pub use lance_encoding::version::LanceFileVersion; #[cfg(feature = "remote")] use log::warn; @@ -140,7 +141,7 @@ pub struct CreateTableBuilder { pub(crate) write_options: WriteOptions, pub(crate) table_definition: Option, pub(crate) embeddings: Vec<(EmbeddingDefinition, Arc)>, - pub(crate) use_legacy_format: bool, + pub(crate) data_storage_version: Option, } // Builder methods that only apply when we have initial data @@ -154,7 +155,7 @@ impl CreateTableBuilder { write_options: WriteOptions::default(), table_definition: None, embeddings: Vec::new(), - use_legacy_format: true, + data_storage_version: None, } } @@ -186,7 +187,7 @@ impl CreateTableBuilder { mode: self.mode, write_options: self.write_options, embeddings: self.embeddings, - use_legacy_format: self.use_legacy_format, + data_storage_version: self.data_storage_version, }; Ok((data, builder)) } @@ -220,7 +221,7 @@ impl CreateTableBuilder { mode: CreateTableMode::default(), write_options: WriteOptions::default(), embeddings: Vec::new(), - use_legacy_format: true, + data_storage_version: None, } } @@ -283,6 +284,14 @@ impl CreateTableBuilder { self } + /// Set the data storage version. + /// + /// The default is `LanceFileVersion::Legacy`. + pub fn data_storage_version(mut self, data_storage_version: LanceFileVersion) -> Self { + self.data_storage_version = Some(data_storage_version); + self + } + /// Set to true to use the v1 format for data files /// /// This is currently defaulted to true and can be set to false to opt-in @@ -292,8 +301,13 @@ impl CreateTableBuilder { /// /// Once the new format is stable, the default will change to `false` for /// several releases and then eventually this option will be removed. + #[deprecated(since = "0.9.0", note = "use data_storage_version instead")] pub fn use_legacy_format(mut self, use_legacy_format: bool) -> Self { - self.use_legacy_format = use_legacy_format; + self.data_storage_version = if use_legacy_format { + Some(LanceFileVersion::Legacy) + } else { + Some(LanceFileVersion::Stable) + }; self } } @@ -789,13 +803,14 @@ impl Database { let plain_uri = url.to_string(); + let registry = Arc::new(ObjectStoreRegistry::default()); let storage_options = options.storage_options.clone(); let os_params = ObjectStoreParams { storage_options: Some(storage_options.clone()), ..Default::default() }; let (object_store, base_path) = - ObjectStore::from_uri_and_params(&plain_uri, &os_params).await?; + ObjectStore::from_uri_and_params(registry, &plain_uri, &os_params).await?; if object_store.is_local() { Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?; } @@ -961,7 +976,7 @@ impl ConnectionInternal for Database { if matches!(&options.mode, CreateTableMode::Overwrite) { write_params.mode = WriteMode::Overwrite; } - write_params.use_legacy_format = options.use_legacy_format; + write_params.data_storage_version = options.data_storage_version; match NativeTable::create( &table_uri, diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 77126517..ffb763bd 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -191,6 +191,8 @@ pub enum OptimizeAction { /// Because they may be part of an in-progress transaction, files newer than 7 days old are not deleted by default. /// If you are sure that there are no in-progress transactions, then you can set this to True to delete all files older than `older_than`. delete_unverified: Option, + /// If true, an error will be returned if there are any old versions that are still tagged. + error_if_tagged_old_versions: Option, }, /// Optimize the indices /// @@ -1079,8 +1081,8 @@ impl NativeTable { params: Option, read_consistency_interval: Option, ) -> Result { + // Default params uses format v1. let params = params.unwrap_or(WriteParams { - use_legacy_format: true, ..Default::default() }); // patch the params if we have a write store wrapper @@ -1173,12 +1175,13 @@ impl NativeTable { &self, older_than: Duration, delete_unverified: Option, + error_if_tagged_old_versions: Option, ) -> Result { Ok(self .dataset .get_mut() .await? - .cleanup_old_versions(older_than, delete_unverified) + .cleanup_old_versions(older_than, delete_unverified, error_if_tagged_old_versions) .await?) } @@ -1506,8 +1509,8 @@ impl NativeTable { } let mut dataset = self.dataset.get_mut().await?; - let lance_idx_params = lance::index::scalar::ScalarIndexParams { - force_index_type: Some(lance::index::scalar::ScalarIndexType::BTree), + let lance_idx_params = lance_index::scalar::ScalarIndexParams { + force_index_type: Some(lance_index::scalar::ScalarIndexType::BTree), }; dataset .create_index( @@ -1607,6 +1610,9 @@ impl TableInternal for NativeTable { let data = MaybeEmbedded::try_new(data, self.table_definition().await?, add.embedding_registry)?; + // Still use the legacy lance format (v1) by default. + // We don't want to accidentally switch to v2 format during an add operation. + // If the table is already v2 this won't have any effect. let mut lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams { mode: match add.mode { AddDataMode::Append => WriteMode::Append, @@ -1628,16 +1634,11 @@ impl TableInternal for NativeTable { } // patch the params if we have a write store wrapper - let mut lance_params = match self.store_wrapper.clone() { + let lance_params = match self.store_wrapper.clone() { Some(wrapper) => lance_params.patch_with_store_wrapper(wrapper)?, None => lance_params, }; - // Only use the new format if the user passes use_legacy_format=False in while creating - // a table with data. We don't want to accidentally switch to v2 format during an add - // operation. If the table is already v2 this won't have any effect. - lance_params.use_legacy_format = true; - self.dataset.ensure_mutable().await?; let dataset = Dataset::write(data, &self.uri, Some(lance_params)).await?; @@ -1878,6 +1879,7 @@ impl TableInternal for NativeTable { .optimize(OptimizeAction::Prune { older_than: None, delete_unverified: None, + error_if_tagged_old_versions: None, }) .await? .prune; @@ -1893,11 +1895,13 @@ impl TableInternal for NativeTable { OptimizeAction::Prune { older_than, delete_unverified, + error_if_tagged_old_versions, } => { stats.prune = Some( self.cleanup_old_versions( older_than.unwrap_or(Duration::try_days(7).expect("valid delta")), delete_unverified, + error_if_tagged_old_versions, ) .await?, );