diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e907714d..dd5eb5d9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,7 +7,7 @@ repos: - id: trailing-whitespace - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.2.2 + rev: v0.8.4 hooks: - id: ruff - repo: local diff --git a/Cargo.lock b/Cargo.lock index c173c08b..149628f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4235,7 +4235,7 @@ dependencies = [ [[package]] name = "lancedb-python" -version = "0.18.1-beta.3" +version = "0.18.1-beta.4" dependencies = [ "arrow", "env_logger 0.10.2", diff --git a/docs/src/js/interfaces/CreateTableOptions.md b/docs/src/js/interfaces/CreateTableOptions.md index 655c1086..d69b98bb 100644 --- a/docs/src/js/interfaces/CreateTableOptions.md +++ b/docs/src/js/interfaces/CreateTableOptions.md @@ -8,7 +8,7 @@ ## Properties -### dataStorageVersion? +### ~~dataStorageVersion?~~ ```ts optional dataStorageVersion: string; @@ -19,6 +19,10 @@ The version of the data storage format to use. The default is `stable`. Set to "legacy" to use the old format. +#### Deprecated + +Pass `new_table_data_storage_version` to storageOptions instead. + *** ### embeddingFunction? @@ -29,7 +33,7 @@ optional embeddingFunction: EmbeddingFunctionConfig; *** -### enableV2ManifestPaths? +### ~~enableV2ManifestPaths?~~ ```ts optional enableV2ManifestPaths: boolean; @@ -41,6 +45,10 @@ turning this on will make the dataset unreadable for older versions of LanceDB (prior to 0.10.0). To migrate an existing dataset, instead use the LocalTable#migrateManifestPathsV2 method. +#### Deprecated + +Pass `new_table_enable_v2_manifest_paths` to storageOptions instead. + *** ### existOk @@ -90,17 +98,3 @@ Options already set on the connection will be inherited by the table, but can be overridden here. The available options are described at https://lancedb.github.io/lancedb/guides/storage/ - -*** - -### useLegacyFormat? - -```ts -optional useLegacyFormat: boolean; -``` - -If true then data files will be written with the legacy format - -The default is false. - -Deprecated. Use data storage version instead. diff --git a/nodejs/__test__/connection.test.ts b/nodejs/__test__/connection.test.ts index 43416a6b..d7a6e92a 100644 --- a/nodejs/__test__/connection.test.ts +++ b/nodejs/__test__/connection.test.ts @@ -17,14 +17,14 @@ describe("when connecting", () => { it("should connect", async () => { const db = await connect(tmpDir.name); expect(db.display()).toBe( - `NativeDatabase(uri=${tmpDir.name}, read_consistency_interval=None)`, + `ListingDatabase(uri=${tmpDir.name}, read_consistency_interval=None)`, ); }); it("should allow read consistency interval to be specified", async () => { const db = await connect(tmpDir.name, { readConsistencyInterval: 5 }); expect(db.display()).toBe( - `NativeDatabase(uri=${tmpDir.name}, read_consistency_interval=5s)`, + `ListingDatabase(uri=${tmpDir.name}, read_consistency_interval=5s)`, ); }); }); @@ -96,14 +96,15 @@ describe("given a connection", () => { const data = [...Array(10000).keys()].map((i) => ({ id: i })); // Create in v1 mode - let table = await db.createTable("test", data, { useLegacyFormat: true }); + let table = await db.createTable("test", data, { + storageOptions: { newTableDataStorageVersion: "legacy" }, + }); const isV2 = async (table: Table) => { const data = await table .query() .limit(10000) .toArrow({ maxBatchLength: 100000 }); - console.log(data.batches.length); return data.batches.length < 5; }; @@ -122,7 +123,7 @@ describe("given a connection", () => { const schema = new Schema([new Field("id", new Float64(), true)]); table = await db.createEmptyTable("test_v2_empty", schema, { - useLegacyFormat: false, + storageOptions: { newTableDataStorageVersion: "stable" }, }); await table.add(data); diff --git a/nodejs/lancedb/connection.ts b/nodejs/lancedb/connection.ts index da3d1d70..6df97185 100644 --- a/nodejs/lancedb/connection.ts +++ b/nodejs/lancedb/connection.ts @@ -52,6 +52,8 @@ export interface CreateTableOptions { * * The default is `stable`. * Set to "legacy" to use the old format. + * + * @deprecated Pass `new_table_data_storage_version` to storageOptions instead. */ dataStorageVersion?: string; @@ -61,17 +63,11 @@ export interface CreateTableOptions { * turning this on will make the dataset unreadable for older versions * of LanceDB (prior to 0.10.0). To migrate an existing dataset, instead * use the {@link LocalTable#migrateManifestPathsV2} method. + * + * @deprecated Pass `new_table_enable_v2_manifest_paths` to storageOptions instead. */ enableV2ManifestPaths?: boolean; - /** - * If true then data files will be written with the legacy format - * - * The default is false. - * - * Deprecated. Use data storage version instead. - */ - useLegacyFormat?: boolean; schema?: SchemaLike; embeddingFunction?: EmbeddingFunctionConfig; } @@ -256,6 +252,28 @@ export class LocalConnection extends Connection { return new LocalTable(innerTable); } + private getStorageOptions( + options?: Partial, + ): Record | undefined { + if (options?.dataStorageVersion !== undefined) { + if (options.storageOptions === undefined) { + options.storageOptions = {}; + } + options.storageOptions["newTableDataStorageVersion"] = + options.dataStorageVersion; + } + + if (options?.enableV2ManifestPaths !== undefined) { + if (options.storageOptions === undefined) { + options.storageOptions = {}; + } + options.storageOptions["newTableEnableV2ManifestPaths"] = + options.enableV2ManifestPaths ? "true" : "false"; + } + + return cleanseStorageOptions(options?.storageOptions); + } + async createTable( nameOrOptions: | string @@ -272,20 +290,14 @@ export class LocalConnection extends Connection { throw new Error("data is required"); } const { buf, mode } = await parseTableData(data, options); - let dataStorageVersion = "stable"; - if (options?.dataStorageVersion !== undefined) { - dataStorageVersion = options.dataStorageVersion; - } else if (options?.useLegacyFormat !== undefined) { - dataStorageVersion = options.useLegacyFormat ? "legacy" : "stable"; - } + + const storageOptions = this.getStorageOptions(options); const innerTable = await this.inner.createTable( nameOrOptions, buf, mode, - cleanseStorageOptions(options?.storageOptions), - dataStorageVersion, - options?.enableV2ManifestPaths, + storageOptions, ); return new LocalTable(innerTable); @@ -309,22 +321,14 @@ export class LocalConnection extends Connection { metadata = registry.getTableMetadata([embeddingFunction]); } - let dataStorageVersion = "stable"; - if (options?.dataStorageVersion !== undefined) { - dataStorageVersion = options.dataStorageVersion; - } else if (options?.useLegacyFormat !== undefined) { - dataStorageVersion = options.useLegacyFormat ? "legacy" : "stable"; - } - + const storageOptions = this.getStorageOptions(options); const table = makeEmptyTable(schema, metadata); const buf = await fromTableToBuffer(table); const innerTable = await this.inner.createEmptyTable( name, buf, mode, - cleanseStorageOptions(options?.storageOptions), - dataStorageVersion, - options?.enableV2ManifestPaths, + storageOptions, ); return new LocalTable(innerTable); } diff --git a/nodejs/src/connection.rs b/nodejs/src/connection.rs index 2ec4b986..e6f3183e 100644 --- a/nodejs/src/connection.rs +++ b/nodejs/src/connection.rs @@ -2,17 +2,15 @@ // SPDX-FileCopyrightText: Copyright The LanceDB Authors use std::collections::HashMap; -use std::str::FromStr; +use lancedb::database::CreateTableMode; use napi::bindgen_prelude::*; use napi_derive::*; -use crate::error::{convert_error, NapiErrorExt}; +use crate::error::NapiErrorExt; use crate::table::Table; use crate::ConnectionOptions; -use lancedb::connection::{ - ConnectBuilder, Connection as LanceDBConnection, CreateTableMode, LanceFileVersion, -}; +use lancedb::connection::{ConnectBuilder, Connection as LanceDBConnection}; use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema}; #[napi] @@ -124,8 +122,6 @@ impl Connection { buf: Buffer, mode: String, storage_options: Option>, - data_storage_options: Option, - enable_v2_manifest_paths: Option, ) -> napi::Result { let batches = ipc_file_to_batches(buf.to_vec()) .map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?; @@ -137,14 +133,6 @@ impl Connection { builder = builder.storage_option(key, value); } } - if let Some(data_storage_option) = data_storage_options.as_ref() { - builder = builder.data_storage_version( - LanceFileVersion::from_str(data_storage_option).map_err(|e| convert_error(&e))?, - ); - } - if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths { - builder = builder.enable_v2_manifest_paths(enable_v2_manifest_paths); - } let tbl = builder.execute().await.default_error()?; Ok(Table::new(tbl)) } @@ -156,8 +144,6 @@ impl Connection { schema_buf: Buffer, mode: String, storage_options: Option>, - data_storage_options: Option, - enable_v2_manifest_paths: Option, ) -> napi::Result
{ let schema = ipc_file_to_schema(schema_buf.to_vec()).map_err(|e| { napi::Error::from_reason(format!("Failed to marshal schema from JS to Rust: {}", e)) @@ -172,14 +158,6 @@ impl Connection { builder = builder.storage_option(key, value); } } - if let Some(data_storage_option) = data_storage_options.as_ref() { - builder = builder.data_storage_version( - LanceFileVersion::from_str(data_storage_option).map_err(|e| convert_error(&e))?, - ); - } - if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths { - builder = builder.enable_v2_manifest_paths(enable_v2_manifest_paths); - } let tbl = builder.execute().await.default_error()?; Ok(Table::new(tbl)) } diff --git a/python/.gitignore b/python/.gitignore new file mode 100644 index 00000000..10d800aa --- /dev/null +++ b/python/.gitignore @@ -0,0 +1,2 @@ +# Test data created by some example tests +data/ \ No newline at end of file diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index fb8c0ac9..993ac03a 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -15,8 +15,6 @@ class Connection(object): mode: str, data: pa.RecordBatchReader, storage_options: Optional[Dict[str, str]] = None, - data_storage_version: Optional[str] = None, - enable_v2_manifest_paths: Optional[bool] = None, ) -> Table: ... async def create_empty_table( self, @@ -24,8 +22,6 @@ class Connection(object): mode: str, schema: pa.Schema, storage_options: Optional[Dict[str, str]] = None, - data_storage_version: Optional[str] = None, - enable_v2_manifest_paths: Optional[bool] = None, ) -> Table: ... async def rename_table(self, old_name: str, new_name: str) -> None: ... async def drop_table(self, name: str) -> None: ... diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index 813348b8..c0ffb3a9 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -119,19 +119,11 @@ class DBConnection(EnforceOverrides): See available options at data_storage_version: optional, str, default "stable" - The version of the data storage format to use. Newer versions are more - efficient but require newer versions of lance to read. The default is - "stable" which will use the legacy v2 version. See the user guide - for more details. - enable_v2_manifest_paths: bool, optional, default False - Use the new V2 manifest paths. These paths provide more efficient - opening of datasets with many versions on object stores. WARNING: - turning this on will make the dataset unreadable for older versions - of LanceDB (prior to 0.13.0). To migrate an existing dataset, instead - use the - [Table.migrate_manifest_paths_v2][lancedb.table.Table.migrate_v2_manifest_paths] - method. - + Deprecated. Set `storage_options` when connecting to the database and set + `new_table_data_storage_version` in the options. + enable_v2_manifest_paths: optional, bool, default False + Deprecated. Set `storage_options` when connecting to the database and set + `new_table_enable_v2_manifest_paths` in the options. Returns ------- LanceTable @@ -452,8 +444,6 @@ class LanceDBConnection(DBConnection): fill_value=fill_value, embedding_functions=embedding_functions, storage_options=storage_options, - data_storage_version=data_storage_version, - enable_v2_manifest_paths=enable_v2_manifest_paths, ) return tbl @@ -595,9 +585,6 @@ class AsyncConnection(object): storage_options: Optional[Dict[str, str]] = None, *, embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None, - data_storage_version: Optional[str] = None, - use_legacy_format: Optional[bool] = None, - enable_v2_manifest_paths: Optional[bool] = None, ) -> AsyncTable: """Create an [AsyncTable][lancedb.table.AsyncTable] in the database. @@ -640,23 +627,6 @@ class AsyncConnection(object): connection will be inherited by the table, but can be overridden here. See available options at - data_storage_version: optional, str, default "stable" - The version of the data storage format to use. Newer versions are more - efficient but require newer versions of lance to read. The default is - "stable" which will use the legacy v2 version. See the user guide - for more details. - use_legacy_format: bool, optional, default False. (Deprecated) - If True, use the legacy format for the table. If False, use the new format. - This method is deprecated, use `data_storage_version` instead. - enable_v2_manifest_paths: bool, optional, default False - Use the new V2 manifest paths. These paths provide more efficient - opening of datasets with many versions on object stores. WARNING: - turning this on will make the dataset unreadable for older versions - of LanceDB (prior to 0.13.0). To migrate an existing dataset, instead - use the - [AsyncTable.migrate_manifest_paths_v2][lancedb.table.AsyncTable.migrate_manifest_paths_v2] - method. - Returns ------- @@ -795,17 +765,12 @@ class AsyncConnection(object): if mode == "create" and exist_ok: mode = "exist_ok" - if not data_storage_version: - data_storage_version = "legacy" if use_legacy_format else "stable" - if data is None: new_table = await self._inner.create_empty_table( name, mode, schema, storage_options=storage_options, - data_storage_version=data_storage_version, - enable_v2_manifest_paths=enable_v2_manifest_paths, ) else: data = data_to_reader(data, schema) @@ -814,8 +779,6 @@ class AsyncConnection(object): mode, data, storage_options=storage_options, - data_storage_version=data_storage_version, - enable_v2_manifest_paths=enable_v2_manifest_paths, ) return AsyncTable(new_table) diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index d67e1cb1..86a1d500 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -4,6 +4,7 @@ from __future__ import annotations import inspect +import warnings from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime, timedelta @@ -2085,10 +2086,37 @@ class LanceTable(Table): The value to use when filling vectors. Only used if on_bad_vectors="fill". embedding_functions: list of EmbeddingFunctionModel, default None The embedding functions to use when creating the table. + data_storage_version: optional, str, default "stable" + Deprecated. Set `storage_options` when connecting to the database and set + `new_table_data_storage_version` in the options. + enable_v2_manifest_paths: optional, bool, default False + Deprecated. Set `storage_options` when connecting to the database and set + `new_table_enable_v2_manifest_paths` in the options. """ self = cls.__new__(cls) self._conn = db + if data_storage_version is not None: + warnings.warn( + "setting data_storage_version directly on create_table is deprecated. ", + "Use database_options instead.", + DeprecationWarning, + ) + if storage_options is None: + storage_options = {} + storage_options["new_table_data_storage_version"] = data_storage_version + if enable_v2_manifest_paths is not None: + warnings.warn( + "setting enable_v2_manifest_paths directly on create_table is ", + "deprecated. Use database_options instead.", + DeprecationWarning, + ) + if storage_options is None: + storage_options = {} + storage_options["new_table_enable_v2_manifest_paths"] = ( + enable_v2_manifest_paths + ) + self._table = LOOP.run( self._conn._conn.create_table( name, @@ -2100,8 +2128,6 @@ class LanceTable(Table): fill_value=fill_value, embedding_functions=embedding_functions, storage_options=storage_options, - data_storage_version=data_storage_version, - enable_v2_manifest_paths=enable_v2_manifest_paths, ) ) return self diff --git a/python/python/tests/test_db.py b/python/python/tests/test_db.py index d7e2d0c4..a7cff5df 100644 --- a/python/python/tests/test_db.py +++ b/python/python/tests/test_db.py @@ -299,12 +299,12 @@ def test_create_exist_ok(tmp_db: lancedb.DBConnection): @pytest.mark.asyncio async def test_connect(tmp_path): db = await lancedb.connect_async(tmp_path) - assert str(db) == f"NativeDatabase(uri={tmp_path}, read_consistency_interval=None)" + assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=None)" db = await lancedb.connect_async( tmp_path, read_consistency_interval=timedelta(seconds=5) ) - assert str(db) == f"NativeDatabase(uri={tmp_path}, read_consistency_interval=5s)" + assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=5s)" @pytest.mark.asyncio @@ -396,13 +396,16 @@ async def test_create_exist_ok_async(tmp_db_async: lancedb.AsyncConnection): @pytest.mark.asyncio async def test_create_table_v2_manifest_paths_async(tmp_path): - db = await lancedb.connect_async(tmp_path) + db_with_v2_paths = await lancedb.connect_async( + tmp_path, storage_options={"new_table_enable_v2_manifest_paths": "true"} + ) + db_no_v2_paths = await lancedb.connect_async( + tmp_path, storage_options={"new_table_enable_v2_manifest_paths": "false"} + ) # Create table in v2 mode with v2 manifest paths enabled - tbl = await db.create_table( + tbl = await db_with_v2_paths.create_table( "test_v2_manifest_paths", data=[{"id": 0}], - use_legacy_format=False, - enable_v2_manifest_paths=True, ) assert await tbl.uses_v2_manifest_paths() manifests_dir = tmp_path / "test_v2_manifest_paths.lance" / "_versions" @@ -410,11 +413,9 @@ async def test_create_table_v2_manifest_paths_async(tmp_path): assert re.match(r"\d{20}\.manifest", manifest) # Start a table in V1 mode then migrate - tbl = await db.create_table( + tbl = await db_no_v2_paths.create_table( "test_v2_migration", data=[{"id": 0}], - use_legacy_format=False, - enable_v2_manifest_paths=False, ) assert not await tbl.uses_v2_manifest_paths() manifests_dir = tmp_path / "test_v2_migration.lance" / "_versions" @@ -583,7 +584,7 @@ def test_empty_or_nonexistent_table(mem_db: lancedb.DBConnection): @pytest.mark.asyncio -async def test_create_in_v2_mode(mem_db_async: lancedb.AsyncConnection): +async def test_create_in_v2_mode(): def make_data(): for i in range(10): yield pa.record_batch([pa.array([x for x in range(1024)])], names=["x"]) @@ -594,10 +595,13 @@ async def test_create_in_v2_mode(mem_db_async: lancedb.AsyncConnection): schema = pa.schema([pa.field("x", pa.int64())]) # Create table in v1 mode - tbl = await mem_db_async.create_table( - "test", data=make_data(), schema=schema, data_storage_version="legacy" + + v1_db = await lancedb.connect_async( + "memory://", storage_options={"new_table_data_storage_version": "legacy"} ) + tbl = await v1_db.create_table("test", data=make_data(), schema=schema) + async def is_in_v2_mode(tbl): batches = ( await tbl.query().limit(10 * 1024).to_batches(max_batch_length=1024 * 10) @@ -610,10 +614,12 @@ async def test_create_in_v2_mode(mem_db_async: lancedb.AsyncConnection): assert not await is_in_v2_mode(tbl) # Create table in v2 mode - tbl = await mem_db_async.create_table( - "test_v2", data=make_data(), schema=schema, use_legacy_format=False + v2_db = await lancedb.connect_async( + "memory://", storage_options={"new_table_data_storage_version": "stable"} ) + tbl = await v2_db.create_table("test_v2", data=make_data(), schema=schema) + assert await is_in_v2_mode(tbl) # Add data (should remain in v2 mode) @@ -622,20 +628,18 @@ async def test_create_in_v2_mode(mem_db_async: lancedb.AsyncConnection): assert await is_in_v2_mode(tbl) # Create empty table in v2 mode and add data - tbl = await mem_db_async.create_table( - "test_empty_v2", data=None, schema=schema, use_legacy_format=False - ) + tbl = await v2_db.create_table("test_empty_v2", data=None, schema=schema) await tbl.add(make_table()) assert await is_in_v2_mode(tbl) - # Create empty table uses v1 mode by default - tbl = await mem_db_async.create_table( - "test_empty_v2_default", data=None, schema=schema, data_storage_version="legacy" - ) + # Db uses v2 mode by default + db = await lancedb.connect_async("memory://") + + tbl = await db.create_table("test_empty_v2_default", data=None, schema=schema) await tbl.add(make_table()) - assert not await is_in_v2_mode(tbl) + assert await is_in_v2_mode(tbl) def test_replace_index(mem_db: lancedb.DBConnection): diff --git a/python/src/connection.rs b/python/src/connection.rs index d1263f24..d71da5e1 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -1,10 +1,10 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The LanceDB Authors -use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::FromPyArrow}; -use lancedb::connection::{Connection as LanceConnection, CreateTableMode, LanceFileVersion}; +use lancedb::{connection::Connection as LanceConnection, database::CreateTableMode}; use pyo3::{ exceptions::{PyRuntimeError, PyValueError}, pyclass, pyfunction, pymethods, Bound, FromPyObject, PyAny, PyRef, PyResult, Python, @@ -80,15 +80,13 @@ impl Connection { future_into_py(self_.py(), async move { op.execute().await.infer_error() }) } - #[pyo3(signature = (name, mode, data, storage_options=None, data_storage_version=None, enable_v2_manifest_paths=None))] + #[pyo3(signature = (name, mode, data, storage_options=None))] pub fn create_table<'a>( self_: PyRef<'a, Self>, name: String, mode: &str, data: Bound<'_, PyAny>, storage_options: Option>, - data_storage_version: Option, - enable_v2_manifest_paths: Option, ) -> PyResult> { let inner = self_.get_inner()?.clone(); @@ -101,32 +99,19 @@ impl Connection { builder = builder.storage_options(storage_options); } - if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths { - builder = builder.enable_v2_manifest_paths(enable_v2_manifest_paths); - } - - if let Some(data_storage_version) = data_storage_version.as_ref() { - builder = builder.data_storage_version( - LanceFileVersion::from_str(data_storage_version) - .map_err(|e| PyValueError::new_err(e.to_string()))?, - ); - } - future_into_py(self_.py(), async move { let table = builder.execute().await.infer_error()?; Ok(Table::new(table)) }) } - #[pyo3(signature = (name, mode, schema, storage_options=None, data_storage_version=None, enable_v2_manifest_paths=None))] + #[pyo3(signature = (name, mode, schema, storage_options=None))] pub fn create_empty_table<'a>( self_: PyRef<'a, Self>, name: String, mode: &str, schema: Bound<'_, PyAny>, storage_options: Option>, - data_storage_version: Option, - enable_v2_manifest_paths: Option, ) -> PyResult> { let inner = self_.get_inner()?.clone(); @@ -140,17 +125,6 @@ impl Connection { builder = builder.storage_options(storage_options); } - if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths { - builder = builder.enable_v2_manifest_paths(enable_v2_manifest_paths); - } - - if let Some(data_storage_version) = data_storage_version.as_ref() { - builder = builder.data_storage_version( - LanceFileVersion::from_str(data_storage_version) - .map_err(|e| PyValueError::new_err(e.to_string()))?, - ); - } - future_into_py(self_.py(), async move { let table = builder.execute().await.infer_error()?; Ok(Table::new(table)) diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 6100abfc..6f58a5f1 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -1,95 +1,47 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The LanceDB Authors -//! LanceDB Database +//! Functions to establish a connection to a LanceDB database use std::collections::HashMap; -use std::fs::create_dir_all; -use std::path::Path; use std::sync::Arc; -use arrow_array::{RecordBatchIterator, RecordBatchReader}; -use arrow_schema::SchemaRef; -use lance::dataset::{ReadParams, WriteMode}; -use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry, WrappingObjectStore}; -use object_store::{aws::AwsCredential, local::LocalFileSystem}; -use snafu::prelude::*; +use arrow_array::RecordBatchReader; +use arrow_schema::{Field, SchemaRef}; +use lance::dataset::ReadParams; +use object_store::aws::AwsCredential; use crate::arrow::IntoArrow; +use crate::database::listing::{ + ListingDatabase, OPT_NEW_TABLE_STORAGE_VERSION, OPT_NEW_TABLE_V2_MANIFEST_PATHS, +}; +use crate::database::{ + CreateTableData, CreateTableMode, CreateTableRequest, Database, DatabaseOptions, + OpenTableRequest, TableNamesRequest, +}; use crate::embeddings::{ EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry, MemoryRegistry, WithEmbeddings, }; -use crate::error::{CreateDirSnafu, Error, InvalidTableNameSnafu, Result}; -use crate::io::object_store::MirroringObjectStoreWrapper; +use crate::error::{Error, Result}; #[cfg(feature = "remote")] use crate::remote::client::ClientConfig; -use crate::table::{NativeTable, TableDefinition, WriteOptions}; -use crate::utils::validate_table_name; +use crate::table::{TableDefinition, WriteOptions}; use crate::Table; pub use lance_encoding::version::LanceFileVersion; #[cfg(feature = "remote")] use lance_io::object_store::StorageOptions; -use lance_table::io::commit::commit_handler_from_url; - -pub const LANCE_FILE_EXTENSION: &str = "lance"; - -pub type TableBuilderCallback = Box OpenTableBuilder + Send>; - -/// Describes what happens when creating a table and a table with -/// the same name already exists -pub enum CreateTableMode { - /// If the table already exists, an error is returned - Create, - /// If the table already exists, it is opened. Any provided data is - /// ignored. The function will be passed an OpenTableBuilder to customize - /// how the table is opened - ExistOk(TableBuilderCallback), - /// If the table already exists, it is overwritten - Overwrite, -} - -impl CreateTableMode { - pub fn exist_ok( - callback: impl FnOnce(OpenTableBuilder) -> OpenTableBuilder + Send + 'static, - ) -> Self { - Self::ExistOk(Box::new(callback)) - } -} - -impl Default for CreateTableMode { - fn default() -> Self { - Self::Create - } -} - -/// Describes what happens when a vector either contains NaN or -/// does not have enough values -#[derive(Clone, Debug, Default)] -enum BadVectorHandling { - /// An error is returned - #[default] - Error, - #[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992 - /// The offending row is droppped - Drop, - #[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992 - /// The invalid/missing items are replaced by fill_value - Fill(f32), -} /// A builder for configuring a [`Connection::table_names`] operation pub struct TableNamesBuilder { - parent: Arc, - pub(crate) start_after: Option, - pub(crate) limit: Option, + parent: Arc, + request: TableNamesRequest, } impl TableNamesBuilder { - fn new(parent: Arc) -> Self { + fn new(parent: Arc) -> Self { Self { parent, - start_after: None, - limit: None, + request: TableNamesRequest::default(), } } @@ -99,19 +51,19 @@ impl TableNamesBuilder { /// This can be combined with limit to implement pagination by setting this to /// the last table name from the previous page. pub fn start_after(mut self, start_after: impl Into) -> Self { - self.start_after = Some(start_after.into()); + self.request.start_after = Some(start_after.into()); self } /// The maximum number of table names to return pub fn limit(mut self, limit: u32) -> Self { - self.limit = Some(limit); + self.request.limit = Some(limit); self } /// Execute the table names operation pub async fn execute(self) -> Result> { - self.parent.clone().table_names(self).await + self.parent.clone().table_names(self.request).await } } @@ -124,98 +76,101 @@ impl IntoArrow for NoData { } /// A builder for configuring a [`Connection::create_table`] operation -pub struct CreateTableBuilder { - pub(crate) parent: Arc, - pub(crate) name: String, - pub(crate) data: Option, - pub(crate) mode: CreateTableMode, - pub(crate) write_options: WriteOptions, - pub(crate) table_definition: Option, - pub(crate) embeddings: Vec<(EmbeddingDefinition, Arc)>, - pub(crate) data_storage_version: Option, - pub(crate) enable_v2_manifest_paths: Option, +pub struct CreateTableBuilder { + parent: Arc, + embeddings: Vec<(EmbeddingDefinition, Arc)>, + embedding_registry: Arc, + request: CreateTableRequest, + // This is a bit clumsy but we defer errors until `execute` is called + // to maintain backwards compatibility + data: Option>>, } // Builder methods that only apply when we have initial data -impl CreateTableBuilder { - fn new(parent: Arc, name: String, data: T) -> Self { +impl CreateTableBuilder { + fn new( + parent: Arc, + name: String, + data: T, + embedding_registry: Arc, + ) -> Self { + let dummy_schema = Arc::new(arrow_schema::Schema::new(Vec::::default())); Self { parent, - name, - data: Some(data), - mode: CreateTableMode::default(), - write_options: WriteOptions::default(), - table_definition: None, + request: CreateTableRequest::new( + name, + CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)), + ), embeddings: Vec::new(), - data_storage_version: None, - enable_v2_manifest_paths: None, + embedding_registry, + data: Some(data.into_arrow()), } } /// Apply the given write options when writing the initial data pub fn write_options(mut self, write_options: WriteOptions) -> Self { - self.write_options = write_options; + self.request.write_options = write_options; self } /// Execute the create table operation pub async fn execute(self) -> Result
{ + let embedding_registry = self.embedding_registry.clone(); let parent = self.parent.clone(); - let (data, builder) = self.extract_data()?; - parent.do_create_table(builder, data).await + let request = self.into_request()?; + Ok(Table::new_with_embedding_registry( + parent.create_table(request).await?, + embedding_registry, + )) } - fn extract_data( - mut self, - ) -> Result<( - Box, - CreateTableBuilder, - )> { - let data = self.data.take().unwrap().into_arrow()?; - let builder = CreateTableBuilder:: { - parent: self.parent, - name: self.name, - data: None, - table_definition: self.table_definition, - mode: self.mode, - write_options: self.write_options, - embeddings: self.embeddings, - data_storage_version: self.data_storage_version, - enable_v2_manifest_paths: self.enable_v2_manifest_paths, + fn into_request(self) -> Result { + let data = if self.embeddings.is_empty() { + self.data.unwrap()? + } else { + let data = self.data.unwrap()?; + Box::new(WithEmbeddings::new(data, self.embeddings)) }; - Ok((data, builder)) + let req = self.request; + Ok(CreateTableRequest { + data: CreateTableData::Data(data), + ..req + }) } } // Builder methods that only apply when we do not have initial data -impl CreateTableBuilder { - fn new(parent: Arc, name: String, schema: SchemaRef) -> Self { +impl CreateTableBuilder { + fn new( + parent: Arc, + name: String, + schema: SchemaRef, + embedding_registry: Arc, + ) -> Self { let table_definition = TableDefinition::new_from_schema(schema); Self { parent, - name, + request: CreateTableRequest::new(name, CreateTableData::Empty(table_definition)), data: None, - table_definition: Some(table_definition), - mode: CreateTableMode::default(), - write_options: WriteOptions::default(), - embeddings: Vec::new(), - data_storage_version: None, - enable_v2_manifest_paths: None, + embeddings: Vec::default(), + embedding_registry, } } /// Execute the create table operation pub async fn execute(self) -> Result
{ - self.parent.clone().do_create_empty_table(self).await + Ok(Table::new( + self.parent.clone().create_table(self.request).await?, + )) } } -impl CreateTableBuilder { +impl CreateTableBuilder { /// Set the mode for creating the table /// /// This controls what happens if a table with the given name already exists pub fn mode(mut self, mode: CreateTableMode) -> Self { - self.mode = mode; + self.request.mode = mode; self } @@ -227,6 +182,7 @@ impl CreateTableBuilder { /// See available options at pub fn storage_option(mut self, key: impl Into, value: impl Into) -> Self { let store_options = self + .request .write_options .lance_write_params .get_or_insert(Default::default()) @@ -249,6 +205,7 @@ impl CreateTableBuilder { pairs: impl IntoIterator, impl Into)>, ) -> Self { let store_options = self + .request .write_options .lance_write_params .get_or_insert(Default::default()) @@ -263,6 +220,25 @@ impl CreateTableBuilder { self } + /// Add an embedding definition to the table. + /// + /// The `embedding_name` must match the name of an embedding function that + /// was previously registered with the connection's [`EmbeddingRegistry`]. + pub fn add_embedding(mut self, definition: EmbeddingDefinition) -> Result { + // Early verification of the embedding name + let embedding_func = self + .embedding_registry + .get(&definition.embedding_name) + .ok_or_else(|| Error::EmbeddingFunctionNotFound { + name: definition.embedding_name.clone(), + reason: "No embedding function found in the connection's embedding_registry" + .to_string(), + })?; + + self.embeddings.push((definition, embedding_func)); + Ok(self) + } + /// Set whether to use V2 manifest paths for the table. (default: false) /// /// These paths provide more efficient opening of tables with many @@ -275,70 +251,73 @@ impl CreateTableBuilder { /// [[NativeTable::migrate_manifest_paths_v2]]. /// /// This has no effect in LanceDB Cloud. + #[deprecated(since = "0.15.1", note = "Use `database_options` instead")] pub fn enable_v2_manifest_paths(mut self, use_v2_manifest_paths: bool) -> Self { - self.enable_v2_manifest_paths = Some(use_v2_manifest_paths); + let storage_options = self + .request + .write_options + .lance_write_params + .get_or_insert_with(Default::default) + .store_params + .get_or_insert_with(Default::default) + .storage_options + .get_or_insert_with(Default::default); + + storage_options.insert( + OPT_NEW_TABLE_V2_MANIFEST_PATHS.to_string(), + if use_v2_manifest_paths { + "true".to_string() + } else { + "false".to_string() + }, + ); self } /// Set the data storage version. /// /// The default is `LanceFileVersion::Stable`. + #[deprecated(since = "0.15.1", note = "Use `database_options` instead")] pub fn data_storage_version(mut self, data_storage_version: LanceFileVersion) -> Self { - self.data_storage_version = Some(data_storage_version); + let storage_options = self + .request + .write_options + .lance_write_params + .get_or_insert_with(Default::default) + .store_params + .get_or_insert_with(Default::default) + .storage_options + .get_or_insert_with(Default::default); + + storage_options.insert( + OPT_NEW_TABLE_STORAGE_VERSION.to_string(), + data_storage_version.to_string(), + ); self } - - /// Set to true to use the v1 format for data files - /// - /// This is set to false by default to enable the stable format. - /// This should only be used for experimentation and - /// evaluation. This option may be removed in the future releases. - #[deprecated(since = "0.9.0", note = "use data_storage_version instead")] - pub fn use_legacy_format(mut self, use_legacy_format: bool) -> Self { - self.data_storage_version = if use_legacy_format { - Some(LanceFileVersion::Legacy) - } else { - Some(LanceFileVersion::Stable) - }; - self - } - - /// Add an embedding definition to the table. - /// - /// The `embedding_name` must match the name of an embedding function that - /// was previously registered with the connection's [`EmbeddingRegistry`]. - pub fn add_embedding(mut self, definition: EmbeddingDefinition) -> Result { - // Early verification of the embedding name - let embedding_func = self - .parent - .embedding_registry() - .get(&definition.embedding_name) - .ok_or_else(|| Error::EmbeddingFunctionNotFound { - name: definition.embedding_name.clone(), - reason: "No embedding function found in the connection's embedding_registry" - .to_string(), - })?; - - self.embeddings.push((definition, embedding_func)); - Ok(self) - } } #[derive(Clone, Debug)] pub struct OpenTableBuilder { - pub(crate) parent: Arc, - pub(crate) name: String, - index_cache_size: u32, - lance_read_params: Option, + parent: Arc, + request: OpenTableRequest, + embedding_registry: Arc, } impl OpenTableBuilder { - pub(crate) fn new(parent: Arc, name: String) -> Self { + pub(crate) fn new( + parent: Arc, + name: String, + embedding_registry: Arc, + ) -> Self { Self { parent, - name, - index_cache_size: 256, - lance_read_params: None, + request: OpenTableRequest { + name, + index_cache_size: None, + lance_read_params: None, + }, + embedding_registry, } } @@ -354,7 +333,7 @@ impl OpenTableBuilder { /// Setting this value higher will increase performance on larger datasets /// at the expense of more RAM pub fn index_cache_size(mut self, index_cache_size: u32) -> Self { - self.index_cache_size = index_cache_size; + self.request.index_cache_size = Some(index_cache_size); self } @@ -362,7 +341,7 @@ impl OpenTableBuilder { /// /// If set, these will take precedence over any overlapping `OpenTableOptions` options pub fn lance_read_params(mut self, params: ReadParams) -> Self { - self.lance_read_params = Some(params); + self.request.lance_read_params = Some(params); self } @@ -374,6 +353,7 @@ impl OpenTableBuilder { /// See available options at pub fn storage_option(mut self, key: impl Into, value: impl Into) -> Self { let storage_options = self + .request .lance_read_params .get_or_insert(Default::default()) .store_options @@ -395,6 +375,7 @@ impl OpenTableBuilder { pairs: impl IntoIterator, impl Into)>, ) -> Self { let storage_options = self + .request .lance_read_params .get_or_insert(Default::default()) .store_options @@ -410,35 +391,10 @@ impl OpenTableBuilder { /// Open the table pub async fn execute(self) -> Result
{ - self.parent.clone().do_open_table(self).await - } -} - -#[async_trait::async_trait] -pub(crate) trait ConnectionInternal: - Send + Sync + std::fmt::Debug + std::fmt::Display + 'static -{ - fn embedding_registry(&self) -> &dyn EmbeddingRegistry; - async fn table_names(&self, options: TableNamesBuilder) -> Result>; - async fn do_create_table( - &self, - options: CreateTableBuilder, - data: Box, - ) -> Result
; - async fn do_open_table(&self, options: OpenTableBuilder) -> Result
; - async fn rename_table(&self, old_name: &str, new_name: &str) -> Result<()>; - async fn drop_table(&self, name: &str) -> Result<()>; - async fn drop_db(&self) -> Result<()>; - - async fn do_create_empty_table( - &self, - options: CreateTableBuilder, - ) -> Result
{ - let batches = Box::new(RecordBatchIterator::new( - vec![], - options.table_definition.clone().unwrap().schema.clone(), - )); - self.do_create_table(options, batches).await + Ok(Table::new_with_embedding_registry( + self.parent.clone().open_table(self.request).await?, + self.embedding_registry, + )) } } @@ -446,7 +402,8 @@ pub(crate) trait ConnectionInternal: #[derive(Clone)] pub struct Connection { uri: String, - internal: Arc, + internal: Arc, + embedding_registry: Arc, } impl std::fmt::Display for Connection { @@ -480,8 +437,13 @@ impl Connection { &self, name: impl Into, initial_data: T, - ) -> CreateTableBuilder { - CreateTableBuilder::::new(self.internal.clone(), name.into(), initial_data) + ) -> CreateTableBuilder { + CreateTableBuilder::::new( + self.internal.clone(), + name.into(), + initial_data, + self.embedding_registry.clone(), + ) } /// Create an empty table with a given schema @@ -494,8 +456,13 @@ impl Connection { &self, name: impl Into, schema: SchemaRef, - ) -> CreateTableBuilder { - CreateTableBuilder::::new(self.internal.clone(), name.into(), schema) + ) -> CreateTableBuilder { + CreateTableBuilder::::new( + self.internal.clone(), + name.into(), + schema, + self.embedding_registry.clone(), + ) } /// Open an existing table in the database @@ -506,7 +473,11 @@ impl Connection { /// # Returns /// Created [`TableRef`], or [`Error::TableNotFound`] if the table does not exist. pub fn open_table(&self, name: impl Into) -> OpenTableBuilder { - OpenTableBuilder::new(self.internal.clone(), name.into()) + OpenTableBuilder::new( + self.internal.clone(), + name.into(), + self.embedding_registry.clone(), + ) } /// Rename a table in the database. @@ -541,12 +512,13 @@ impl Connection { /// It's important to note that the embedding registry is not persisted across connections. /// So if a table contains embeddings, you will need to make sure that you are using a connection that has the same embedding functions registered pub fn embedding_registry(&self) -> &dyn EmbeddingRegistry { - self.internal.embedding_registry() + self.embedding_registry.as_ref() } } -#[derive(Debug)] -pub struct ConnectBuilder { +/// A request to connect to a database +#[derive(Clone, Debug)] +pub struct ConnectRequest { /// Database URI /// /// ### Accpeted URI formats @@ -554,18 +526,18 @@ pub struct ConnectBuilder { /// - `/path/to/database` - local database on file system. /// - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store /// - `db://dbname` - LanceDB Cloud - uri: String, + pub uri: String, /// LanceDB Cloud API key, required if using Lance Cloud - api_key: Option, + pub api_key: Option, /// LanceDB Cloud region, required if using Lance Cloud - region: Option, + pub region: Option, /// LanceDB Cloud host override, only required if using an on-premises Lance Cloud instance - host_override: Option, + pub host_override: Option, #[cfg(feature = "remote")] - client_config: ClientConfig, + pub client_config: ClientConfig, - storage_options: HashMap, + pub storage_options: HashMap, /// The interval at which to check for updates from other processes. /// @@ -577,7 +549,12 @@ pub struct ConnectBuilder { /// the last check, then the table will be checked for updates. Note: this /// consistency only applies to read operations. Write operations are /// always consistent. - read_consistency_interval: Option, + pub read_consistency_interval: Option, +} + +#[derive(Debug)] +pub struct ConnectBuilder { + request: ConnectRequest, embedding_registry: Option>, } @@ -585,30 +562,37 @@ impl ConnectBuilder { /// Create a new [`ConnectOptions`] with the given database URI. pub fn new(uri: &str) -> Self { Self { - uri: uri.to_string(), - api_key: None, - region: None, - host_override: None, - #[cfg(feature = "remote")] - client_config: Default::default(), - read_consistency_interval: None, - storage_options: HashMap::new(), + request: ConnectRequest { + uri: uri.to_string(), + api_key: None, + region: None, + host_override: None, + #[cfg(feature = "remote")] + client_config: Default::default(), + read_consistency_interval: None, + storage_options: HashMap::new(), + }, embedding_registry: None, } } pub fn api_key(mut self, api_key: &str) -> Self { - self.api_key = Some(api_key.to_string()); + self.request.api_key = Some(api_key.to_string()); self } pub fn region(mut self, region: &str) -> Self { - self.region = Some(region.to_string()); + self.request.region = Some(region.to_string()); self } pub fn host_override(mut self, host_override: &str) -> Self { - self.host_override = Some(host_override.to_string()); + self.request.host_override = Some(host_override.to_string()); + self + } + + pub fn database_options(mut self, database_options: &dyn DatabaseOptions) -> Self { + database_options.serialize_into_map(&mut self.request.storage_options); self } @@ -632,7 +616,7 @@ impl ConnectBuilder { /// ``` #[cfg(feature = "remote")] pub fn client_config(mut self, config: ClientConfig) -> Self { - self.client_config = config; + self.request.client_config = config; self } @@ -645,12 +629,15 @@ impl ConnectBuilder { /// [`AwsCredential`] to use when connecting to S3. #[deprecated(note = "Pass through storage_options instead")] pub fn aws_creds(mut self, aws_creds: AwsCredential) -> Self { - self.storage_options + self.request + .storage_options .insert("aws_access_key_id".into(), aws_creds.key_id.clone()); - self.storage_options + self.request + .storage_options .insert("aws_secret_access_key".into(), aws_creds.secret_key.clone()); if let Some(token) = &aws_creds.token { - self.storage_options + self.request + .storage_options .insert("aws_session_token".into(), token.clone()); } self @@ -660,7 +647,9 @@ impl ConnectBuilder { /// /// See available options at pub fn storage_option(mut self, key: impl Into, value: impl Into) -> Self { - self.storage_options.insert(key.into(), value.into()); + self.request + .storage_options + .insert(key.into(), value.into()); self } @@ -672,7 +661,9 @@ impl ConnectBuilder { pairs: impl IntoIterator, impl Into)>, ) -> Self { for (key, value) in pairs { - self.storage_options.insert(key.into(), value.into()); + self.request + .storage_options + .insert(key.into(), value.into()); } self } @@ -696,31 +687,34 @@ impl ConnectBuilder { mut self, read_consistency_interval: std::time::Duration, ) -> Self { - self.read_consistency_interval = Some(read_consistency_interval); + self.request.read_consistency_interval = Some(read_consistency_interval); self } #[cfg(feature = "remote")] fn execute_remote(self) -> Result { - let region = self.region.ok_or_else(|| Error::InvalidInput { + let region = self.request.region.ok_or_else(|| Error::InvalidInput { message: "A region is required when connecting to LanceDb Cloud".to_string(), })?; - let api_key = self.api_key.ok_or_else(|| Error::InvalidInput { + let api_key = self.request.api_key.ok_or_else(|| Error::InvalidInput { message: "An api_key is required when connecting to LanceDb Cloud".to_string(), })?; - let storage_options = StorageOptions(self.storage_options.clone()); + let storage_options = StorageOptions(self.request.storage_options.clone()); let internal = Arc::new(crate::remote::db::RemoteDatabase::try_new( - &self.uri, + &self.request.uri, &api_key, ®ion, - self.host_override, - self.client_config, + self.request.host_override, + self.request.client_config, storage_options.into(), )?); Ok(Connection { internal, - uri: self.uri, + uri: self.request.uri, + embedding_registry: self + .embedding_registry + .unwrap_or_else(|| Arc::new(MemoryRegistry::new())), }) } @@ -734,13 +728,16 @@ impl ConnectBuilder { /// Establishes a connection to the database pub async fn execute(self) -> Result { - if self.uri.starts_with("db") { + if self.request.uri.starts_with("db") { self.execute_remote() } else { - let internal = Arc::new(Database::connect_with_options(&self).await?); + let internal = Arc::new(ListingDatabase::connect_with_options(&self.request).await?); Ok(Connection { internal, - uri: self.uri, + uri: self.request.uri, + embedding_registry: self + .embedding_registry + .unwrap_or_else(|| Arc::new(MemoryRegistry::new())), }) } } @@ -756,418 +753,6 @@ pub fn connect(uri: &str) -> ConnectBuilder { ConnectBuilder::new(uri) } -#[derive(Debug)] -struct Database { - object_store: ObjectStore, - query_string: Option, - - pub(crate) uri: String, - pub(crate) base_path: object_store::path::Path, - - // the object store wrapper to use on write path - pub(crate) store_wrapper: Option>, - - read_consistency_interval: Option, - - // Storage options to be inherited by tables created from this connection - storage_options: HashMap, - embedding_registry: Arc, -} - -impl std::fmt::Display for Database { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "NativeDatabase(uri={}, read_consistency_interval={})", - self.uri, - match self.read_consistency_interval { - None => { - "None".to_string() - } - Some(duration) => { - format!("{}s", duration.as_secs_f64()) - } - } - ) - } -} - -const LANCE_EXTENSION: &str = "lance"; -const ENGINE: &str = "engine"; -const MIRRORED_STORE: &str = "mirroredStore"; - -/// A connection to LanceDB -impl Database { - async fn connect_with_options(options: &ConnectBuilder) -> Result { - let uri = &options.uri; - let parse_res = url::Url::parse(uri); - - // TODO: pass params regardless of OS - match parse_res { - Ok(url) if url.scheme().len() == 1 && cfg!(windows) => { - Self::open_path( - uri, - options.read_consistency_interval, - options.embedding_registry.clone(), - ) - .await - } - Ok(mut url) => { - // iter thru the query params and extract the commit store param - let mut engine = None; - let mut mirrored_store = None; - let mut filtered_querys = vec![]; - - // WARNING: specifying engine is NOT a publicly supported feature in lancedb yet - // THE API WILL CHANGE - for (key, value) in url.query_pairs() { - if key == ENGINE { - engine = Some(value.to_string()); - } else if key == MIRRORED_STORE { - if cfg!(windows) { - return Err(Error::NotSupported { - message: "mirrored store is not supported on windows".into(), - }); - } - mirrored_store = Some(value.to_string()); - } else { - // to owned so we can modify the url - filtered_querys.push((key.to_string(), value.to_string())); - } - } - - // Filter out the commit store query param -- it's a lancedb param - url.query_pairs_mut().clear(); - url.query_pairs_mut().extend_pairs(filtered_querys); - // Take a copy of the query string so we can propagate it to lance - let query_string = url.query().map(|s| s.to_string()); - // clear the query string so we can use the url as the base uri - // use .set_query(None) instead of .set_query("") because the latter - // will add a trailing '?' to the url - url.set_query(None); - - let table_base_uri = if let Some(store) = engine { - static WARN_ONCE: std::sync::Once = std::sync::Once::new(); - WARN_ONCE.call_once(|| { - log::warn!("Specifying engine is not a publicly supported feature in lancedb yet. THE API WILL CHANGE"); - }); - let old_scheme = url.scheme().to_string(); - let new_scheme = format!("{}+{}", old_scheme, store); - url.to_string().replacen(&old_scheme, &new_scheme, 1) - } else { - url.to_string() - }; - - let plain_uri = url.to_string(); - - let registry = Arc::new(ObjectStoreRegistry::default()); - let storage_options = options.storage_options.clone(); - let os_params = ObjectStoreParams { - storage_options: Some(storage_options.clone()), - ..Default::default() - }; - let (object_store, base_path) = - ObjectStore::from_uri_and_params(registry, &plain_uri, &os_params).await?; - if object_store.is_local() { - Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?; - } - - let write_store_wrapper = match mirrored_store { - Some(path) => { - let mirrored_store = Arc::new(LocalFileSystem::new_with_prefix(path)?); - let wrapper = MirroringObjectStoreWrapper::new(mirrored_store); - Some(Arc::new(wrapper) as Arc) - } - None => None, - }; - - let embedding_registry = options - .embedding_registry - .clone() - .unwrap_or_else(|| Arc::new(MemoryRegistry::new())); - Ok(Self { - uri: table_base_uri, - query_string, - base_path, - object_store, - store_wrapper: write_store_wrapper, - read_consistency_interval: options.read_consistency_interval, - storage_options, - embedding_registry, - }) - } - Err(_) => { - Self::open_path( - uri, - options.read_consistency_interval, - options.embedding_registry.clone(), - ) - .await - } - } - } - - async fn open_path( - path: &str, - read_consistency_interval: Option, - embedding_registry: Option>, - ) -> Result { - let (object_store, base_path) = ObjectStore::from_uri(path).await?; - if object_store.is_local() { - Self::try_create_dir(path).context(CreateDirSnafu { path })?; - } - - let embedding_registry = - embedding_registry.unwrap_or_else(|| Arc::new(MemoryRegistry::new())); - - Ok(Self { - uri: path.to_string(), - query_string: None, - base_path, - object_store, - store_wrapper: None, - read_consistency_interval, - storage_options: HashMap::new(), - embedding_registry, - }) - } - - /// Try to create a local directory to store the lancedb dataset - fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> { - let path = Path::new(path); - if !path.try_exists()? { - create_dir_all(path)?; - } - Ok(()) - } - - /// Get the URI of a table in the database. - fn table_uri(&self, name: &str) -> Result { - validate_table_name(name)?; - - let path = Path::new(&self.uri); - let table_uri = path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION)); - - let mut uri = table_uri - .as_path() - .to_str() - .context(InvalidTableNameSnafu { - name, - reason: "Name is not valid URL", - })? - .to_string(); - - // If there are query string set on the connection, propagate to lance - if let Some(query) = self.query_string.as_ref() { - uri.push('?'); - uri.push_str(query.as_str()); - } - - Ok(uri) - } -} - -#[async_trait::async_trait] -impl ConnectionInternal for Database { - fn embedding_registry(&self) -> &dyn EmbeddingRegistry { - self.embedding_registry.as_ref() - } - async fn table_names(&self, options: TableNamesBuilder) -> Result> { - let mut f = self - .object_store - .read_dir(self.base_path.clone()) - .await? - .iter() - .map(Path::new) - .filter(|path| { - let is_lance = path - .extension() - .and_then(|e| e.to_str()) - .map(|e| e == LANCE_EXTENSION); - is_lance.unwrap_or(false) - }) - .filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from))) - .collect::>(); - f.sort(); - if let Some(start_after) = options.start_after { - let index = f - .iter() - .position(|name| name.as_str() > start_after.as_str()) - .unwrap_or(f.len()); - f.drain(0..index); - } - if let Some(limit) = options.limit { - f.truncate(limit as usize); - } - Ok(f) - } - - async fn do_create_table( - &self, - mut options: CreateTableBuilder, - data: Box, - ) -> Result
{ - let table_uri = self.table_uri(&options.name)?; - let embedding_registry = self.embedding_registry.clone(); - // Inherit storage options from the connection - let storage_options = options - .write_options - .lance_write_params - .get_or_insert_with(Default::default) - .store_params - .get_or_insert_with(Default::default) - .storage_options - .get_or_insert_with(Default::default); - for (key, value) in self.storage_options.iter() { - if !storage_options.contains_key(key) { - storage_options.insert(key.clone(), value.clone()); - } - } - let data = if options.embeddings.is_empty() { - data - } else { - Box::new(WithEmbeddings::new(data, options.embeddings)) - }; - - let mut write_params = options.write_options.lance_write_params.unwrap_or_default(); - - if matches!(&options.mode, CreateTableMode::Overwrite) { - write_params.mode = WriteMode::Overwrite; - } - - write_params.data_storage_version = options.data_storage_version; - write_params.enable_v2_manifest_paths = - options.enable_v2_manifest_paths.unwrap_or_default(); - - let data_schema = data.schema(); - - match NativeTable::create( - &table_uri, - &options.name, - data, - self.store_wrapper.clone(), - Some(write_params), - self.read_consistency_interval, - ) - .await - { - Ok(table) => Ok(Table::new_with_embedding_registry( - Arc::new(table), - embedding_registry, - )), - Err(Error::TableAlreadyExists { name }) => match options.mode { - CreateTableMode::Create => Err(Error::TableAlreadyExists { name }), - CreateTableMode::ExistOk(callback) => { - let builder = OpenTableBuilder::new(options.parent, options.name); - let builder = (callback)(builder); - let table = builder.execute().await?; - - let table_schema = table.schema().await?; - - if table_schema != data_schema { - return Err(Error::Schema { - message: "Provided schema does not match existing table schema" - .to_string(), - }); - } - - Ok(table) - } - CreateTableMode::Overwrite => unreachable!(), - }, - Err(err) => Err(err), - } - } - - async fn do_open_table(&self, mut options: OpenTableBuilder) -> Result
{ - let table_uri = self.table_uri(&options.name)?; - let embedding_registry = self.embedding_registry.clone(); - - // Inherit storage options from the connection - let storage_options = options - .lance_read_params - .get_or_insert_with(Default::default) - .store_options - .get_or_insert_with(Default::default) - .storage_options - .get_or_insert_with(Default::default); - for (key, value) in self.storage_options.iter() { - if !storage_options.contains_key(key) { - storage_options.insert(key.clone(), value.clone()); - } - } - - // Some ReadParams are exposed in the OpenTableBuilder, but we also - // let the user provide their own ReadParams. - // - // If we have a user provided ReadParams use that - // If we don't then start with the default ReadParams and customize it with - // the options from the OpenTableBuilder - let read_params = options.lance_read_params.unwrap_or_else(|| ReadParams { - index_cache_size: options.index_cache_size as usize, - ..Default::default() - }); - - let native_table = Arc::new( - NativeTable::open_with_params( - &table_uri, - &options.name, - self.store_wrapper.clone(), - Some(read_params), - self.read_consistency_interval, - ) - .await?, - ); - Ok(Table::new_with_embedding_registry( - native_table, - embedding_registry, - )) - } - - async fn rename_table(&self, _old_name: &str, _new_name: &str) -> Result<()> { - Err(Error::NotSupported { - message: "rename_table is not supported in LanceDB OSS".to_string(), - }) - } - - async fn drop_table(&self, name: &str) -> Result<()> { - let dir_name = format!("{}.{}", name, LANCE_EXTENSION); - let full_path = self.base_path.child(dir_name.clone()); - self.object_store - .remove_dir_all(full_path.clone()) - .await - .map_err(|err| match err { - // this error is not lance::Error::DatasetNotFound, - // as the method `remove_dir_all` may be used to remove something not be a dataset - lance::Error::NotFound { .. } => Error::TableNotFound { - name: name.to_owned(), - }, - _ => Error::from(err), - })?; - - let object_store_params = ObjectStoreParams { - storage_options: Some(self.storage_options.clone()), - ..Default::default() - }; - let mut uri = self.uri.clone(); - if let Some(query_string) = &self.query_string { - uri.push_str(&format!("?{}", query_string)); - } - let commit_handler = commit_handler_from_url(&uri, &Some(object_store_params)) - .await - .unwrap(); - commit_handler.delete(&full_path).await.unwrap(); - Ok(()) - } - - async fn drop_db(&self) -> Result<()> { - self.object_store - .remove_dir_all(self.base_path.clone()) - .await?; - Ok(()) - } -} - #[cfg(all(test, feature = "remote"))] mod test_utils { use super::*; @@ -1182,6 +767,7 @@ mod test_utils { Self { internal, uri: "db://test".to_string(), + embedding_registry: Arc::new(MemoryRegistry::new()), } } } @@ -1189,11 +775,15 @@ mod test_utils { #[cfg(test)] mod tests { + use std::fs::create_dir_all; + + use arrow_array::RecordBatchReader; use arrow_schema::{DataType, Field, Schema}; use futures::TryStreamExt; use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; use tempfile::tempdir; + use crate::database::listing::{ListingDatabaseOptions, NewTableConfig}; use crate::query::QueryBase; use crate::query::{ExecutableQuery, QueryExecutionOptions}; @@ -1310,11 +900,19 @@ mod tests { async fn test_create_table_v2() { let tmp_dir = tempdir().unwrap(); let uri = tmp_dir.path().to_str().unwrap(); - let db = connect(uri).execute().await.unwrap(); + let db = connect(uri) + .database_options(&ListingDatabaseOptions { + new_table_config: NewTableConfig { + data_storage_version: Some(LanceFileVersion::Legacy), + ..Default::default() + }, + }) + .execute() + .await + .unwrap(); let tbl = db .create_table("v1_test", make_data()) - .data_storage_version(LanceFileVersion::Legacy) .execute() .await .unwrap(); @@ -1334,9 +932,19 @@ mod tests { .unwrap(); assert_eq!(batches.len(), 20); + let db = connect(uri) + .database_options(&ListingDatabaseOptions { + new_table_config: NewTableConfig { + data_storage_version: Some(LanceFileVersion::Stable), + ..Default::default() + }, + }) + .execute() + .await + .unwrap(); + let tbl = db .create_table("v2_test", make_data()) - .data_storage_version(LanceFileVersion::Stable) .execute() .await .unwrap(); @@ -1390,8 +998,9 @@ mod tests { // TODO: None of the open table options are "inspectable" right now but once one is we // should assert we are passing these options in correctly db.create_empty_table("test", schema) - .mode(CreateTableMode::exist_ok(|builder| { - builder.index_cache_size(16) + .mode(CreateTableMode::exist_ok(|mut req| { + req.index_cache_size = Some(16); + req })) .execute() .await diff --git a/rust/lancedb/src/database.rs b/rust/lancedb/src/database.rs new file mode 100644 index 00000000..2516e4b5 --- /dev/null +++ b/rust/lancedb/src/database.rs @@ -0,0 +1,133 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The LanceDB Authors + +//! The database module defines the `Database` trait and related types. +//! +//! A "database" is a generic concept for something that manages tables and their metadata. +//! +//! We provide a basic implementation of a database that requires no additional infrastructure +//! and is based off listing directories in a filesystem. +//! +//! Users may want to provider their own implementations for a variety of reasons: +//! * Tables may be arranged in a different order on the S3 filesystem +//! * Tables may be managed by some kind of independent application (e.g. some database) +//! * Tables may be managed by a database system (e.g. Postgres) +//! * A custom table implementation (e.g. remote table, etc.) may be used + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::RecordBatchReader; +use lance::dataset::ReadParams; + +use crate::error::Result; +use crate::table::{TableDefinition, TableInternal, WriteOptions}; + +pub mod listing; + +pub trait DatabaseOptions { + fn serialize_into_map(&self, map: &mut HashMap); +} + +/// A request to list names of tables in the database +#[derive(Clone, Debug, Default)] +pub struct TableNamesRequest { + /// If present, only return names that come lexicographically after the supplied + /// value. + /// + /// This can be combined with limit to implement pagination by setting this to + /// the last table name from the previous page. + pub start_after: Option, + /// The maximum number of table names to return + pub limit: Option, +} + +/// A request to open a table +#[derive(Clone, Debug)] +pub struct OpenTableRequest { + pub name: String, + pub index_cache_size: Option, + pub lance_read_params: Option, +} + +pub type TableBuilderCallback = Box OpenTableRequest + Send>; + +/// Describes what happens when creating a table and a table with +/// the same name already exists +pub enum CreateTableMode { + /// If the table already exists, an error is returned + Create, + /// If the table already exists, it is opened. Any provided data is + /// ignored. The function will be passed an OpenTableBuilder to customize + /// how the table is opened + ExistOk(TableBuilderCallback), + /// If the table already exists, it is overwritten + Overwrite, +} + +impl CreateTableMode { + pub fn exist_ok( + callback: impl FnOnce(OpenTableRequest) -> OpenTableRequest + Send + 'static, + ) -> Self { + Self::ExistOk(Box::new(callback)) + } +} + +impl Default for CreateTableMode { + fn default() -> Self { + Self::Create + } +} + +/// The data to start a table or a schema to create an empty table +pub enum CreateTableData { + /// Creates a table using data, no schema required as it will be obtained from the data + Data(Box), + /// Creates an empty table, the definition / schema must be provided separately + Empty(TableDefinition), +} + +/// A request to create a table +pub struct CreateTableRequest { + /// The name of the new table + pub name: String, + /// Initial data to write to the table, can be None to create an empty table + pub data: CreateTableData, + /// The mode to use when creating the table + pub mode: CreateTableMode, + /// Options to use when writing data (only used if `data` is not None) + pub write_options: WriteOptions, +} + +impl CreateTableRequest { + pub fn new(name: String, data: CreateTableData) -> Self { + Self { + name, + data, + mode: CreateTableMode::default(), + write_options: WriteOptions::default(), + } + } +} + +/// The `Database` trait defines the interface for database implementations. +/// +/// A database is responsible for managing tables and their metadata. +#[async_trait::async_trait] +pub trait Database: + Send + Sync + std::any::Any + std::fmt::Debug + std::fmt::Display + 'static +{ + /// List the names of tables in the database + async fn table_names(&self, request: TableNamesRequest) -> Result>; + /// Create a table in the database + async fn create_table(&self, request: CreateTableRequest) -> Result>; + /// Open a table in the database + async fn open_table(&self, request: OpenTableRequest) -> Result>; + /// Rename a table in the database + async fn rename_table(&self, old_name: &str, new_name: &str) -> Result<()>; + /// Drop a table in the database + async fn drop_table(&self, name: &str) -> Result<()>; + /// Drop all tables in the database + async fn drop_db(&self) -> Result<()>; + fn as_any(&self) -> &dyn std::any::Any; +} diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs new file mode 100644 index 00000000..38ce87ff --- /dev/null +++ b/rust/lancedb/src/database/listing.rs @@ -0,0 +1,545 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The LanceDB Authors + +//! Provides the `ListingDatabase`, a simple database where tables are folders in a directory + +use std::fs::create_dir_all; +use std::path::Path; +use std::{collections::HashMap, sync::Arc}; + +use arrow_array::RecordBatchIterator; +use lance::dataset::{ReadParams, WriteMode}; +use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry, WrappingObjectStore}; +use lance_encoding::version::LanceFileVersion; +use lance_table::io::commit::commit_handler_from_url; +use object_store::local::LocalFileSystem; +use snafu::{OptionExt, ResultExt}; + +use crate::connection::ConnectRequest; +use crate::error::{CreateDirSnafu, Error, InvalidTableNameSnafu, Result}; +use crate::io::object_store::MirroringObjectStoreWrapper; +use crate::table::NativeTable; +use crate::utils::validate_table_name; + +use super::{ + CreateTableData, CreateTableMode, CreateTableRequest, Database, DatabaseOptions, + OpenTableRequest, TableInternal, TableNamesRequest, +}; + +/// File extension to indicate a lance table +pub const LANCE_FILE_EXTENSION: &str = "lance"; + +pub const OPT_NEW_TABLE_STORAGE_VERSION: &str = "new_table_data_storage_version"; +pub const OPT_NEW_TABLE_V2_MANIFEST_PATHS: &str = "new_table_enable_v2_manifest_paths"; + +/// Controls how new tables should be created +#[derive(Clone, Debug, Default)] +pub struct NewTableConfig { + /// The storage version to use for new tables + /// + /// If unset, then the latest stable version will be used + pub data_storage_version: Option, + /// Whether to enable V2 manifest paths for new tables + /// + /// V2 manifest paths are more efficient than V2 manifest paths but are not + /// supported by old clients. + pub enable_v2_manifest_paths: Option, +} + +/// Options specific to the listing database +#[derive(Debug, Default, Clone)] +pub struct ListingDatabaseOptions { + /// Controls what kind of Lance tables will be created by this database + pub new_table_config: NewTableConfig, +} + +impl ListingDatabaseOptions { + fn parse_from_map(map: &HashMap) -> Result { + let new_table_config = NewTableConfig { + data_storage_version: map + .get(OPT_NEW_TABLE_STORAGE_VERSION) + .map(|s| s.parse()) + .transpose()?, + enable_v2_manifest_paths: map + .get(OPT_NEW_TABLE_V2_MANIFEST_PATHS) + .map(|s| { + s.parse::().map_err(|_| Error::InvalidInput { + message: format!( + "enable_v2_manifest_paths must be a boolean, received {}", + s + ), + }) + }) + .transpose()?, + }; + Ok(Self { new_table_config }) + } +} + +impl DatabaseOptions for ListingDatabaseOptions { + fn serialize_into_map(&self, map: &mut HashMap) { + if let Some(storage_version) = &self.new_table_config.data_storage_version { + map.insert( + OPT_NEW_TABLE_STORAGE_VERSION.to_string(), + storage_version.to_string(), + ); + } + if let Some(enable_v2_manifest_paths) = self.new_table_config.enable_v2_manifest_paths { + map.insert( + OPT_NEW_TABLE_V2_MANIFEST_PATHS.to_string(), + enable_v2_manifest_paths.to_string(), + ); + } + } +} + +/// A database that stores tables in a flat directory structure +/// +/// Tables are stored as directories in the base path of the object store. +/// +/// It is called a "listing database" because we use a "list directory" operation +/// to discover what tables are available. Table names are determined from the directory +/// names. +/// +/// For example, given the following directory structure: +/// +/// ```text +/// /data +/// /table1.lance +/// /table2.lance +/// ``` +/// +/// We will have two tables named `table1` and `table2`. +#[derive(Debug)] +pub struct ListingDatabase { + object_store: ObjectStore, + query_string: Option, + + pub(crate) uri: String, + pub(crate) base_path: object_store::path::Path, + + // the object store wrapper to use on write path + pub(crate) store_wrapper: Option>, + + read_consistency_interval: Option, + + // Storage options to be inherited by tables created from this connection + storage_options: HashMap, + + // Options for tables created by this connection + new_table_config: NewTableConfig, +} + +impl std::fmt::Display for ListingDatabase { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ListingDatabase(uri={}, read_consistency_interval={})", + self.uri, + match self.read_consistency_interval { + None => { + "None".to_string() + } + Some(duration) => { + format!("{}s", duration.as_secs_f64()) + } + } + ) + } +} + +const LANCE_EXTENSION: &str = "lance"; +const ENGINE: &str = "engine"; +const MIRRORED_STORE: &str = "mirroredStore"; + +/// A connection to LanceDB +impl ListingDatabase { + /// Connect to a listing database + /// + /// The URI should be a path to a directory where the tables are stored. + /// + /// See [`ListingDatabaseOptions`] for options that can be set on the connection (via + /// `storage_options`). + pub async fn connect_with_options(request: &ConnectRequest) -> Result { + let uri = &request.uri; + let parse_res = url::Url::parse(uri); + + let options = ListingDatabaseOptions::parse_from_map(&request.storage_options)?; + + // TODO: pass params regardless of OS + match parse_res { + Ok(url) if url.scheme().len() == 1 && cfg!(windows) => { + Self::open_path( + uri, + request.read_consistency_interval, + options.new_table_config, + ) + .await + } + Ok(mut url) => { + // iter thru the query params and extract the commit store param + let mut engine = None; + let mut mirrored_store = None; + let mut filtered_querys = vec![]; + + // WARNING: specifying engine is NOT a publicly supported feature in lancedb yet + // THE API WILL CHANGE + for (key, value) in url.query_pairs() { + if key == ENGINE { + engine = Some(value.to_string()); + } else if key == MIRRORED_STORE { + if cfg!(windows) { + return Err(Error::NotSupported { + message: "mirrored store is not supported on windows".into(), + }); + } + mirrored_store = Some(value.to_string()); + } else { + // to owned so we can modify the url + filtered_querys.push((key.to_string(), value.to_string())); + } + } + + // Filter out the commit store query param -- it's a lancedb param + url.query_pairs_mut().clear(); + url.query_pairs_mut().extend_pairs(filtered_querys); + // Take a copy of the query string so we can propagate it to lance + let query_string = url.query().map(|s| s.to_string()); + // clear the query string so we can use the url as the base uri + // use .set_query(None) instead of .set_query("") because the latter + // will add a trailing '?' to the url + url.set_query(None); + + let table_base_uri = if let Some(store) = engine { + static WARN_ONCE: std::sync::Once = std::sync::Once::new(); + WARN_ONCE.call_once(|| { + log::warn!("Specifying engine is not a publicly supported feature in lancedb yet. THE API WILL CHANGE"); + }); + let old_scheme = url.scheme().to_string(); + let new_scheme = format!("{}+{}", old_scheme, store); + url.to_string().replacen(&old_scheme, &new_scheme, 1) + } else { + url.to_string() + }; + + let plain_uri = url.to_string(); + + let registry = Arc::new(ObjectStoreRegistry::default()); + let storage_options = request.storage_options.clone(); + let os_params = ObjectStoreParams { + storage_options: Some(storage_options.clone()), + ..Default::default() + }; + let (object_store, base_path) = + ObjectStore::from_uri_and_params(registry, &plain_uri, &os_params).await?; + if object_store.is_local() { + Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?; + } + + let write_store_wrapper = match mirrored_store { + Some(path) => { + let mirrored_store = Arc::new(LocalFileSystem::new_with_prefix(path)?); + let wrapper = MirroringObjectStoreWrapper::new(mirrored_store); + Some(Arc::new(wrapper) as Arc) + } + None => None, + }; + + Ok(Self { + uri: table_base_uri, + query_string, + base_path, + object_store, + store_wrapper: write_store_wrapper, + read_consistency_interval: request.read_consistency_interval, + storage_options, + new_table_config: options.new_table_config, + }) + } + Err(_) => { + Self::open_path( + uri, + request.read_consistency_interval, + options.new_table_config, + ) + .await + } + } + } + + async fn open_path( + path: &str, + read_consistency_interval: Option, + new_table_config: NewTableConfig, + ) -> Result { + let (object_store, base_path) = ObjectStore::from_uri(path).await?; + if object_store.is_local() { + Self::try_create_dir(path).context(CreateDirSnafu { path })?; + } + + Ok(Self { + uri: path.to_string(), + query_string: None, + base_path, + object_store, + store_wrapper: None, + read_consistency_interval, + storage_options: HashMap::new(), + new_table_config, + }) + } + + /// Try to create a local directory to store the lancedb dataset + fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> { + let path = Path::new(path); + if !path.try_exists()? { + create_dir_all(path)?; + } + Ok(()) + } + + /// Get the URI of a table in the database. + fn table_uri(&self, name: &str) -> Result { + validate_table_name(name)?; + + let path = Path::new(&self.uri); + let table_uri = path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION)); + + let mut uri = table_uri + .as_path() + .to_str() + .context(InvalidTableNameSnafu { + name, + reason: "Name is not valid URL", + })? + .to_string(); + + // If there are query string set on the connection, propagate to lance + if let Some(query) = self.query_string.as_ref() { + uri.push('?'); + uri.push_str(query.as_str()); + } + + Ok(uri) + } +} + +#[async_trait::async_trait] +impl Database for ListingDatabase { + async fn table_names(&self, request: TableNamesRequest) -> Result> { + let mut f = self + .object_store + .read_dir(self.base_path.clone()) + .await? + .iter() + .map(Path::new) + .filter(|path| { + let is_lance = path + .extension() + .and_then(|e| e.to_str()) + .map(|e| e == LANCE_EXTENSION); + is_lance.unwrap_or(false) + }) + .filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from))) + .collect::>(); + f.sort(); + if let Some(start_after) = request.start_after { + let index = f + .iter() + .position(|name| name.as_str() > start_after.as_str()) + .unwrap_or(f.len()); + f.drain(0..index); + } + if let Some(limit) = request.limit { + f.truncate(limit as usize); + } + Ok(f) + } + + async fn create_table( + &self, + mut request: CreateTableRequest, + ) -> Result> { + let table_uri = self.table_uri(&request.name)?; + // Inherit storage options from the connection + let storage_options = request + .write_options + .lance_write_params + .get_or_insert_with(Default::default) + .store_params + .get_or_insert_with(Default::default) + .storage_options + .get_or_insert_with(Default::default); + for (key, value) in self.storage_options.iter() { + if !storage_options.contains_key(key) { + storage_options.insert(key.clone(), value.clone()); + } + } + + let storage_options = storage_options.clone(); + + let mut write_params = request.write_options.lance_write_params.unwrap_or_default(); + + if let Some(storage_version) = &self.new_table_config.data_storage_version { + write_params.data_storage_version = Some(*storage_version); + } else { + // Allow the user to override the storage version via storage options (backwards compatibility) + if let Some(data_storage_version) = storage_options.get(OPT_NEW_TABLE_STORAGE_VERSION) { + write_params.data_storage_version = Some(data_storage_version.parse()?); + } + } + if let Some(enable_v2_manifest_paths) = self.new_table_config.enable_v2_manifest_paths { + write_params.enable_v2_manifest_paths = enable_v2_manifest_paths; + } else { + // Allow the user to override the storage version via storage options (backwards compatibility) + if let Some(enable_v2_manifest_paths) = storage_options + .get(OPT_NEW_TABLE_V2_MANIFEST_PATHS) + .map(|s| s.parse::().unwrap()) + { + write_params.enable_v2_manifest_paths = enable_v2_manifest_paths; + } + } + + if matches!(&request.mode, CreateTableMode::Overwrite) { + write_params.mode = WriteMode::Overwrite; + } + + let data = match request.data { + CreateTableData::Data(data) => data, + CreateTableData::Empty(table_definition) => { + let schema = table_definition.schema.clone(); + Box::new(RecordBatchIterator::new(vec![], schema)) + } + }; + let data_schema = data.schema(); + + match NativeTable::create( + &table_uri, + &request.name, + data, + self.store_wrapper.clone(), + Some(write_params), + self.read_consistency_interval, + ) + .await + { + Ok(table) => Ok(Arc::new(table)), + Err(Error::TableAlreadyExists { name }) => match request.mode { + CreateTableMode::Create => Err(Error::TableAlreadyExists { name }), + CreateTableMode::ExistOk(callback) => { + let req = OpenTableRequest { + name: request.name.clone(), + index_cache_size: None, + lance_read_params: None, + }; + let req = (callback)(req); + let table = self.open_table(req).await?; + + let table_schema = table.schema().await?; + + if table_schema != data_schema { + return Err(Error::Schema { + message: "Provided schema does not match existing table schema" + .to_string(), + }); + } + + Ok(table) + } + CreateTableMode::Overwrite => unreachable!(), + }, + Err(err) => Err(err), + } + } + + async fn open_table(&self, mut request: OpenTableRequest) -> Result> { + let table_uri = self.table_uri(&request.name)?; + + // Inherit storage options from the connection + let storage_options = request + .lance_read_params + .get_or_insert_with(Default::default) + .store_options + .get_or_insert_with(Default::default) + .storage_options + .get_or_insert_with(Default::default); + for (key, value) in self.storage_options.iter() { + if !storage_options.contains_key(key) { + storage_options.insert(key.clone(), value.clone()); + } + } + + // Some ReadParams are exposed in the OpenTableBuilder, but we also + // let the user provide their own ReadParams. + // + // If we have a user provided ReadParams use that + // If we don't then start with the default ReadParams and customize it with + // the options from the OpenTableBuilder + let read_params = request.lance_read_params.unwrap_or_else(|| { + let mut default_params = ReadParams::default(); + if let Some(index_cache_size) = request.index_cache_size { + default_params.index_cache_size = index_cache_size as usize; + } + default_params + }); + + let native_table = Arc::new( + NativeTable::open_with_params( + &table_uri, + &request.name, + self.store_wrapper.clone(), + Some(read_params), + self.read_consistency_interval, + ) + .await?, + ); + Ok(native_table) + } + + async fn rename_table(&self, _old_name: &str, _new_name: &str) -> Result<()> { + Err(Error::NotSupported { + message: "rename_table is not supported in LanceDB OSS".to_string(), + }) + } + + async fn drop_table(&self, name: &str) -> Result<()> { + let dir_name = format!("{}.{}", name, LANCE_EXTENSION); + let full_path = self.base_path.child(dir_name.clone()); + self.object_store + .remove_dir_all(full_path.clone()) + .await + .map_err(|err| match err { + // this error is not lance::Error::DatasetNotFound, + // as the method `remove_dir_all` may be used to remove something not be a dataset + lance::Error::NotFound { .. } => Error::TableNotFound { + name: name.to_owned(), + }, + _ => Error::from(err), + })?; + + let object_store_params = ObjectStoreParams { + storage_options: Some(self.storage_options.clone()), + ..Default::default() + }; + let mut uri = self.uri.clone(); + if let Some(query_string) = &self.query_string { + uri.push_str(&format!("?{}", query_string)); + } + let commit_handler = commit_handler_from_url(&uri, &Some(object_store_params)) + .await + .unwrap(); + commit_handler.delete(&full_path).await.unwrap(); + Ok(()) + } + + async fn drop_db(&self) -> Result<()> { + self.object_store + .remove_dir_all(self.base_path.clone()) + .await?; + Ok(()) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} diff --git a/rust/lancedb/src/lib.rs b/rust/lancedb/src/lib.rs index 014b4290..02127f2f 100644 --- a/rust/lancedb/src/lib.rs +++ b/rust/lancedb/src/lib.rs @@ -193,6 +193,7 @@ pub mod arrow; pub mod connection; pub mod data; +pub mod database; pub mod embeddings; pub mod error; pub mod index; diff --git a/rust/lancedb/src/query.rs b/rust/lancedb/src/query.rs index 001a131c..1e09d6a0 100644 --- a/rust/lancedb/src/query.rs +++ b/rust/lancedb/src/query.rs @@ -1015,7 +1015,7 @@ mod tests { use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector}; use tempfile::tempdir; - use crate::{connect, connection::CreateTableMode, Table}; + use crate::{connect, database::CreateTableMode, Table}; #[tokio::test] async fn test_setters_getters() { diff --git a/rust/lancedb/src/remote/client.rs b/rust/lancedb/src/remote/client.rs index bbc51c63..ed3a6f27 100644 --- a/rust/lancedb/src/remote/client.rs +++ b/rust/lancedb/src/remote/client.rs @@ -15,7 +15,7 @@ use crate::remote::db::RemoteOptions; const REQUEST_ID_HEADER: &str = "x-request-id"; /// Configuration for the LanceDB Cloud HTTP client. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ClientConfig { pub timeout_config: TimeoutConfig, pub retry_config: RetryConfig, @@ -36,7 +36,7 @@ impl Default for ClientConfig { } /// How to handle timeouts for HTTP requests. -#[derive(Default, Debug)] +#[derive(Clone, Default, Debug)] pub struct TimeoutConfig { /// The timeout for creating a connection to the server. /// @@ -62,7 +62,7 @@ pub struct TimeoutConfig { } /// How to handle retries for HTTP requests. -#[derive(Default, Debug)] +#[derive(Clone, Default, Debug)] pub struct RetryConfig { /// The number of times to retry a request if it fails. /// diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index 9ee1e68d..31a88652 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::sync::Arc; -use arrow_array::RecordBatchReader; +use arrow_array::RecordBatchIterator; use async_trait::async_trait; use http::StatusCode; use lance_io::object_store::StorageOptions; @@ -13,13 +13,12 @@ use reqwest::header::CONTENT_TYPE; use serde::Deserialize; use tokio::task::spawn_blocking; -use crate::connection::{ - ConnectionInternal, CreateTableBuilder, CreateTableMode, NoData, OpenTableBuilder, - TableNamesBuilder, +use crate::database::{ + CreateTableData, CreateTableMode, CreateTableRequest, Database, OpenTableRequest, + TableNamesRequest, }; -use crate::embeddings::EmbeddingRegistry; use crate::error::Result; -use crate::Table; +use crate::table::TableInternal; use super::client::{ClientConfig, HttpSend, RequestResultExt, RestfulLanceDbClient, Sender}; use super::table::RemoteTable; @@ -105,13 +104,13 @@ impl From<&CreateTableMode> for &'static str { } #[async_trait] -impl ConnectionInternal for RemoteDatabase { - async fn table_names(&self, options: TableNamesBuilder) -> Result> { +impl Database for RemoteDatabase { + async fn table_names(&self, request: TableNamesRequest) -> Result> { let mut req = self.client.get("/v1/table/"); - if let Some(limit) = options.limit { + if let Some(limit) = request.limit { req = req.query(&[("limit", limit)]); } - if let Some(start_after) = options.start_after { + if let Some(start_after) = request.start_after { req = req.query(&[("page_token", start_after)]); } let (request_id, rsp) = self.client.send(req, true).await?; @@ -127,11 +126,15 @@ impl ConnectionInternal for RemoteDatabase { Ok(tables) } - async fn do_create_table( - &self, - options: CreateTableBuilder, - data: Box, - ) -> Result
{ + async fn create_table(&self, request: CreateTableRequest) -> Result> { + let data = match request.data { + CreateTableData::Data(data) => data, + CreateTableData::Empty(table_definition) => { + let schema = table_definition.schema.clone(); + Box::new(RecordBatchIterator::new(vec![], schema)) + } + }; + // TODO: https://github.com/lancedb/lancedb/issues/1026 // We should accept data from an async source. In the meantime, spawn this as blocking // to make sure we don't block the tokio runtime if the source is slow. @@ -141,8 +144,8 @@ impl ConnectionInternal for RemoteDatabase { let req = self .client - .post(&format!("/v1/table/{}/create/", options.name)) - .query(&[("mode", Into::<&str>::into(&options.mode))]) + .post(&format!("/v1/table/{}/create/", request.name)) + .query(&[("mode", Into::<&str>::into(&request.mode))]) .body(data_buffer) .header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE); @@ -151,14 +154,18 @@ impl ConnectionInternal for RemoteDatabase { if rsp.status() == StatusCode::BAD_REQUEST { let body = rsp.text().await.err_to_http(request_id.clone())?; if body.contains("already exists") { - return match options.mode { + return match request.mode { CreateTableMode::Create => { - Err(crate::Error::TableAlreadyExists { name: options.name }) + Err(crate::Error::TableAlreadyExists { name: request.name }) } CreateTableMode::ExistOk(callback) => { - let builder = OpenTableBuilder::new(options.parent, options.name); - let builder = (callback)(builder); - builder.execute().await + let req = OpenTableRequest { + name: request.name.clone(), + index_cache_size: None, + lance_read_params: None, + }; + let req = (callback)(req); + self.open_table(req).await } // This should not happen, as we explicitly set the mode to overwrite and the server @@ -183,31 +190,31 @@ impl ConnectionInternal for RemoteDatabase { self.client.check_response(&request_id, rsp).await?; - self.table_cache.insert(options.name.clone(), ()).await; + self.table_cache.insert(request.name.clone(), ()).await; - Ok(Table::new(Arc::new(RemoteTable::new( + Ok(Arc::new(RemoteTable::new( self.client.clone(), - options.name, - )))) + request.name, + ))) } - async fn do_open_table(&self, options: OpenTableBuilder) -> Result
{ + async fn open_table(&self, request: OpenTableRequest) -> Result> { // We describe the table to confirm it exists before moving on. - if self.table_cache.get(&options.name).is_none() { + if self.table_cache.get(&request.name).is_none() { let req = self .client - .post(&format!("/v1/table/{}/describe/", options.name)); + .post(&format!("/v1/table/{}/describe/", request.name)); let (request_id, resp) = self.client.send(req, true).await?; if resp.status() == StatusCode::NOT_FOUND { - return Err(crate::Error::TableNotFound { name: options.name }); + return Err(crate::Error::TableNotFound { name: request.name }); } self.client.check_response(&request_id, resp).await?; } - Ok(Table::new(Arc::new(RemoteTable::new( + Ok(Arc::new(RemoteTable::new( self.client.clone(), - options.name, - )))) + request.name, + ))) } async fn rename_table(&self, current_name: &str, new_name: &str) -> Result<()> { @@ -236,8 +243,8 @@ impl ConnectionInternal for RemoteDatabase { }) } - fn embedding_registry(&self) -> &dyn EmbeddingRegistry { - todo!() + fn as_any(&self) -> &dyn std::any::Any { + self } } @@ -273,7 +280,7 @@ mod tests { use crate::connection::ConnectBuilder; use crate::{ - connection::CreateTableMode, + database::CreateTableMode, remote::{ARROW_STREAM_CONTENT_TYPE, JSON_CONTENT_TYPE}, Connection, Error, }; diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 7cf98361..b8315638 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -230,6 +230,24 @@ pub struct OptimizeStats { pub prune: Option, } +/// Describes what happens when a vector either contains NaN or +/// does not have enough values +#[derive(Clone, Debug, Default)] +enum BadVectorHandling { + /// An error is returned + #[default] + Error, + #[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992 + /// The offending row is droppped + Drop, + #[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992 + /// The invalid/missing items are replaced by fill_value + Fill(f32), + #[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992 + /// The invalid items are replaced by NULL + None, +} + /// Options to use when writing data #[derive(Clone, Debug, Default)] pub struct WriteOptions { @@ -364,7 +382,7 @@ impl UpdateBuilder { } #[async_trait] -pub(crate) trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Sync { +pub trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Sync { #[allow(dead_code)] fn as_any(&self) -> &dyn std::any::Any; /// Cast as [`NativeTable`], or return None it if is not a [`NativeTable`]. @@ -465,7 +483,7 @@ impl std::fmt::Display for Table { } impl Table { - pub(crate) fn new(inner: Arc) -> Self { + pub fn new(inner: Arc) -> Self { Self { inner, embedding_registry: Arc::new(MemoryRegistry::new()), @@ -1164,7 +1182,7 @@ impl NativeTable { /// # Returns /// /// * A [TableImpl] object. - pub(crate) async fn create( + pub async fn create( uri: &str, name: &str, batches: impl RecordBatchReader + Send + 'static,