feat!: refactor ConnectionInternal into a Database trait (#2067)

This opens up the door for more custom database implementations than the
two we have today. The biggest change should be inivisble:
`ConnectionInternal` has been renamed to `Database`, made public, and
refactored

However, there are a few breaking changes. `data_storage_version` and
`enable_v2_manifest_paths` have been moved from options on
`create_table` to options for the database which are now set via
`storage_options`.

Before:
```
db = connect(uri)
tbl = db.create_table("my_table", data, data_storage_version="legacy", enable_v2_manifest_paths=True)
```

After:
```
db = connect(uri, storage_options={
  "new_table_enable_v2_manifest_paths": "true",
  "new_table_data_storage_version": "legacy"
})
tbl = db.create_table("my_table", data)
```

BREAKING CHANGE: the data_storage_version, enable_v2_manifest_paths
options have moved from options to create_table to storage_options.
BREAKING CHANGE: the use_legacy_format option has been removed,
data_storage_version has replaced it for some time now
This commit is contained in:
Weston Pace
2025-02-04 14:35:14 -08:00
committed by GitHub
parent f6eef14313
commit c269524b2f
20 changed files with 1131 additions and 876 deletions

View File

@@ -7,7 +7,7 @@ repos:
- id: trailing-whitespace
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.2.2
rev: v0.8.4
hooks:
- id: ruff
- repo: local

2
Cargo.lock generated
View File

@@ -4235,7 +4235,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.18.1-beta.3"
version = "0.18.1-beta.4"
dependencies = [
"arrow",
"env_logger 0.10.2",

View File

@@ -8,7 +8,7 @@
## Properties
### dataStorageVersion?
### ~~dataStorageVersion?~~
```ts
optional dataStorageVersion: string;
@@ -19,6 +19,10 @@ The version of the data storage format to use.
The default is `stable`.
Set to "legacy" to use the old format.
#### Deprecated
Pass `new_table_data_storage_version` to storageOptions instead.
***
### embeddingFunction?
@@ -29,7 +33,7 @@ optional embeddingFunction: EmbeddingFunctionConfig;
***
### enableV2ManifestPaths?
### ~~enableV2ManifestPaths?~~
```ts
optional enableV2ManifestPaths: boolean;
@@ -41,6 +45,10 @@ turning this on will make the dataset unreadable for older versions
of LanceDB (prior to 0.10.0). To migrate an existing dataset, instead
use the LocalTable#migrateManifestPathsV2 method.
#### Deprecated
Pass `new_table_enable_v2_manifest_paths` to storageOptions instead.
***
### existOk
@@ -90,17 +98,3 @@ Options already set on the connection will be inherited by the table,
but can be overridden here.
The available options are described at https://lancedb.github.io/lancedb/guides/storage/
***
### useLegacyFormat?
```ts
optional useLegacyFormat: boolean;
```
If true then data files will be written with the legacy format
The default is false.
Deprecated. Use data storage version instead.

View File

@@ -17,14 +17,14 @@ describe("when connecting", () => {
it("should connect", async () => {
const db = await connect(tmpDir.name);
expect(db.display()).toBe(
`NativeDatabase(uri=${tmpDir.name}, read_consistency_interval=None)`,
`ListingDatabase(uri=${tmpDir.name}, read_consistency_interval=None)`,
);
});
it("should allow read consistency interval to be specified", async () => {
const db = await connect(tmpDir.name, { readConsistencyInterval: 5 });
expect(db.display()).toBe(
`NativeDatabase(uri=${tmpDir.name}, read_consistency_interval=5s)`,
`ListingDatabase(uri=${tmpDir.name}, read_consistency_interval=5s)`,
);
});
});
@@ -96,14 +96,15 @@ describe("given a connection", () => {
const data = [...Array(10000).keys()].map((i) => ({ id: i }));
// Create in v1 mode
let table = await db.createTable("test", data, { useLegacyFormat: true });
let table = await db.createTable("test", data, {
storageOptions: { newTableDataStorageVersion: "legacy" },
});
const isV2 = async (table: Table) => {
const data = await table
.query()
.limit(10000)
.toArrow({ maxBatchLength: 100000 });
console.log(data.batches.length);
return data.batches.length < 5;
};
@@ -122,7 +123,7 @@ describe("given a connection", () => {
const schema = new Schema([new Field("id", new Float64(), true)]);
table = await db.createEmptyTable("test_v2_empty", schema, {
useLegacyFormat: false,
storageOptions: { newTableDataStorageVersion: "stable" },
});
await table.add(data);

View File

@@ -52,6 +52,8 @@ export interface CreateTableOptions {
*
* The default is `stable`.
* Set to "legacy" to use the old format.
*
* @deprecated Pass `new_table_data_storage_version` to storageOptions instead.
*/
dataStorageVersion?: string;
@@ -61,17 +63,11 @@ export interface CreateTableOptions {
* turning this on will make the dataset unreadable for older versions
* of LanceDB (prior to 0.10.0). To migrate an existing dataset, instead
* use the {@link LocalTable#migrateManifestPathsV2} method.
*
* @deprecated Pass `new_table_enable_v2_manifest_paths` to storageOptions instead.
*/
enableV2ManifestPaths?: boolean;
/**
* If true then data files will be written with the legacy format
*
* The default is false.
*
* Deprecated. Use data storage version instead.
*/
useLegacyFormat?: boolean;
schema?: SchemaLike;
embeddingFunction?: EmbeddingFunctionConfig;
}
@@ -256,6 +252,28 @@ export class LocalConnection extends Connection {
return new LocalTable(innerTable);
}
private getStorageOptions(
options?: Partial<CreateTableOptions>,
): Record<string, string> | undefined {
if (options?.dataStorageVersion !== undefined) {
if (options.storageOptions === undefined) {
options.storageOptions = {};
}
options.storageOptions["newTableDataStorageVersion"] =
options.dataStorageVersion;
}
if (options?.enableV2ManifestPaths !== undefined) {
if (options.storageOptions === undefined) {
options.storageOptions = {};
}
options.storageOptions["newTableEnableV2ManifestPaths"] =
options.enableV2ManifestPaths ? "true" : "false";
}
return cleanseStorageOptions(options?.storageOptions);
}
async createTable(
nameOrOptions:
| string
@@ -272,20 +290,14 @@ export class LocalConnection extends Connection {
throw new Error("data is required");
}
const { buf, mode } = await parseTableData(data, options);
let dataStorageVersion = "stable";
if (options?.dataStorageVersion !== undefined) {
dataStorageVersion = options.dataStorageVersion;
} else if (options?.useLegacyFormat !== undefined) {
dataStorageVersion = options.useLegacyFormat ? "legacy" : "stable";
}
const storageOptions = this.getStorageOptions(options);
const innerTable = await this.inner.createTable(
nameOrOptions,
buf,
mode,
cleanseStorageOptions(options?.storageOptions),
dataStorageVersion,
options?.enableV2ManifestPaths,
storageOptions,
);
return new LocalTable(innerTable);
@@ -309,22 +321,14 @@ export class LocalConnection extends Connection {
metadata = registry.getTableMetadata([embeddingFunction]);
}
let dataStorageVersion = "stable";
if (options?.dataStorageVersion !== undefined) {
dataStorageVersion = options.dataStorageVersion;
} else if (options?.useLegacyFormat !== undefined) {
dataStorageVersion = options.useLegacyFormat ? "legacy" : "stable";
}
const storageOptions = this.getStorageOptions(options);
const table = makeEmptyTable(schema, metadata);
const buf = await fromTableToBuffer(table);
const innerTable = await this.inner.createEmptyTable(
name,
buf,
mode,
cleanseStorageOptions(options?.storageOptions),
dataStorageVersion,
options?.enableV2ManifestPaths,
storageOptions,
);
return new LocalTable(innerTable);
}

View File

@@ -2,17 +2,15 @@
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::collections::HashMap;
use std::str::FromStr;
use lancedb::database::CreateTableMode;
use napi::bindgen_prelude::*;
use napi_derive::*;
use crate::error::{convert_error, NapiErrorExt};
use crate::error::NapiErrorExt;
use crate::table::Table;
use crate::ConnectionOptions;
use lancedb::connection::{
ConnectBuilder, Connection as LanceDBConnection, CreateTableMode, LanceFileVersion,
};
use lancedb::connection::{ConnectBuilder, Connection as LanceDBConnection};
use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema};
#[napi]
@@ -124,8 +122,6 @@ impl Connection {
buf: Buffer,
mode: String,
storage_options: Option<HashMap<String, String>>,
data_storage_options: Option<String>,
enable_v2_manifest_paths: Option<bool>,
) -> napi::Result<Table> {
let batches = ipc_file_to_batches(buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
@@ -137,14 +133,6 @@ impl Connection {
builder = builder.storage_option(key, value);
}
}
if let Some(data_storage_option) = data_storage_options.as_ref() {
builder = builder.data_storage_version(
LanceFileVersion::from_str(data_storage_option).map_err(|e| convert_error(&e))?,
);
}
if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths {
builder = builder.enable_v2_manifest_paths(enable_v2_manifest_paths);
}
let tbl = builder.execute().await.default_error()?;
Ok(Table::new(tbl))
}
@@ -156,8 +144,6 @@ impl Connection {
schema_buf: Buffer,
mode: String,
storage_options: Option<HashMap<String, String>>,
data_storage_options: Option<String>,
enable_v2_manifest_paths: Option<bool>,
) -> napi::Result<Table> {
let schema = ipc_file_to_schema(schema_buf.to_vec()).map_err(|e| {
napi::Error::from_reason(format!("Failed to marshal schema from JS to Rust: {}", e))
@@ -172,14 +158,6 @@ impl Connection {
builder = builder.storage_option(key, value);
}
}
if let Some(data_storage_option) = data_storage_options.as_ref() {
builder = builder.data_storage_version(
LanceFileVersion::from_str(data_storage_option).map_err(|e| convert_error(&e))?,
);
}
if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths {
builder = builder.enable_v2_manifest_paths(enable_v2_manifest_paths);
}
let tbl = builder.execute().await.default_error()?;
Ok(Table::new(tbl))
}

2
python/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
# Test data created by some example tests
data/

View File

@@ -15,8 +15,6 @@ class Connection(object):
mode: str,
data: pa.RecordBatchReader,
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> Table: ...
async def create_empty_table(
self,
@@ -24,8 +22,6 @@ class Connection(object):
mode: str,
schema: pa.Schema,
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> Table: ...
async def rename_table(self, old_name: str, new_name: str) -> None: ...
async def drop_table(self, name: str) -> None: ...

View File

@@ -119,19 +119,11 @@ class DBConnection(EnforceOverrides):
See available options at
<https://lancedb.github.io/lancedb/guides/storage/>
data_storage_version: optional, str, default "stable"
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default is
"stable" which will use the legacy v2 version. See the user guide
for more details.
enable_v2_manifest_paths: bool, optional, default False
Use the new V2 manifest paths. These paths provide more efficient
opening of datasets with many versions on object stores. WARNING:
turning this on will make the dataset unreadable for older versions
of LanceDB (prior to 0.13.0). To migrate an existing dataset, instead
use the
[Table.migrate_manifest_paths_v2][lancedb.table.Table.migrate_v2_manifest_paths]
method.
Deprecated. Set `storage_options` when connecting to the database and set
`new_table_data_storage_version` in the options.
enable_v2_manifest_paths: optional, bool, default False
Deprecated. Set `storage_options` when connecting to the database and set
`new_table_enable_v2_manifest_paths` in the options.
Returns
-------
LanceTable
@@ -452,8 +444,6 @@ class LanceDBConnection(DBConnection):
fill_value=fill_value,
embedding_functions=embedding_functions,
storage_options=storage_options,
data_storage_version=data_storage_version,
enable_v2_manifest_paths=enable_v2_manifest_paths,
)
return tbl
@@ -595,9 +585,6 @@ class AsyncConnection(object):
storage_options: Optional[Dict[str, str]] = None,
*,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
data_storage_version: Optional[str] = None,
use_legacy_format: Optional[bool] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> AsyncTable:
"""Create an [AsyncTable][lancedb.table.AsyncTable] in the database.
@@ -640,23 +627,6 @@ class AsyncConnection(object):
connection will be inherited by the table, but can be overridden here.
See available options at
<https://lancedb.github.io/lancedb/guides/storage/>
data_storage_version: optional, str, default "stable"
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default is
"stable" which will use the legacy v2 version. See the user guide
for more details.
use_legacy_format: bool, optional, default False. (Deprecated)
If True, use the legacy format for the table. If False, use the new format.
This method is deprecated, use `data_storage_version` instead.
enable_v2_manifest_paths: bool, optional, default False
Use the new V2 manifest paths. These paths provide more efficient
opening of datasets with many versions on object stores. WARNING:
turning this on will make the dataset unreadable for older versions
of LanceDB (prior to 0.13.0). To migrate an existing dataset, instead
use the
[AsyncTable.migrate_manifest_paths_v2][lancedb.table.AsyncTable.migrate_manifest_paths_v2]
method.
Returns
-------
@@ -795,17 +765,12 @@ class AsyncConnection(object):
if mode == "create" and exist_ok:
mode = "exist_ok"
if not data_storage_version:
data_storage_version = "legacy" if use_legacy_format else "stable"
if data is None:
new_table = await self._inner.create_empty_table(
name,
mode,
schema,
storage_options=storage_options,
data_storage_version=data_storage_version,
enable_v2_manifest_paths=enable_v2_manifest_paths,
)
else:
data = data_to_reader(data, schema)
@@ -814,8 +779,6 @@ class AsyncConnection(object):
mode,
data,
storage_options=storage_options,
data_storage_version=data_storage_version,
enable_v2_manifest_paths=enable_v2_manifest_paths,
)
return AsyncTable(new_table)

View File

@@ -4,6 +4,7 @@
from __future__ import annotations
import inspect
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta
@@ -2085,10 +2086,37 @@ class LanceTable(Table):
The value to use when filling vectors. Only used if on_bad_vectors="fill".
embedding_functions: list of EmbeddingFunctionModel, default None
The embedding functions to use when creating the table.
data_storage_version: optional, str, default "stable"
Deprecated. Set `storage_options` when connecting to the database and set
`new_table_data_storage_version` in the options.
enable_v2_manifest_paths: optional, bool, default False
Deprecated. Set `storage_options` when connecting to the database and set
`new_table_enable_v2_manifest_paths` in the options.
"""
self = cls.__new__(cls)
self._conn = db
if data_storage_version is not None:
warnings.warn(
"setting data_storage_version directly on create_table is deprecated. ",
"Use database_options instead.",
DeprecationWarning,
)
if storage_options is None:
storage_options = {}
storage_options["new_table_data_storage_version"] = data_storage_version
if enable_v2_manifest_paths is not None:
warnings.warn(
"setting enable_v2_manifest_paths directly on create_table is ",
"deprecated. Use database_options instead.",
DeprecationWarning,
)
if storage_options is None:
storage_options = {}
storage_options["new_table_enable_v2_manifest_paths"] = (
enable_v2_manifest_paths
)
self._table = LOOP.run(
self._conn._conn.create_table(
name,
@@ -2100,8 +2128,6 @@ class LanceTable(Table):
fill_value=fill_value,
embedding_functions=embedding_functions,
storage_options=storage_options,
data_storage_version=data_storage_version,
enable_v2_manifest_paths=enable_v2_manifest_paths,
)
)
return self

View File

@@ -299,12 +299,12 @@ def test_create_exist_ok(tmp_db: lancedb.DBConnection):
@pytest.mark.asyncio
async def test_connect(tmp_path):
db = await lancedb.connect_async(tmp_path)
assert str(db) == f"NativeDatabase(uri={tmp_path}, read_consistency_interval=None)"
assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=None)"
db = await lancedb.connect_async(
tmp_path, read_consistency_interval=timedelta(seconds=5)
)
assert str(db) == f"NativeDatabase(uri={tmp_path}, read_consistency_interval=5s)"
assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=5s)"
@pytest.mark.asyncio
@@ -396,13 +396,16 @@ async def test_create_exist_ok_async(tmp_db_async: lancedb.AsyncConnection):
@pytest.mark.asyncio
async def test_create_table_v2_manifest_paths_async(tmp_path):
db = await lancedb.connect_async(tmp_path)
db_with_v2_paths = await lancedb.connect_async(
tmp_path, storage_options={"new_table_enable_v2_manifest_paths": "true"}
)
db_no_v2_paths = await lancedb.connect_async(
tmp_path, storage_options={"new_table_enable_v2_manifest_paths": "false"}
)
# Create table in v2 mode with v2 manifest paths enabled
tbl = await db.create_table(
tbl = await db_with_v2_paths.create_table(
"test_v2_manifest_paths",
data=[{"id": 0}],
use_legacy_format=False,
enable_v2_manifest_paths=True,
)
assert await tbl.uses_v2_manifest_paths()
manifests_dir = tmp_path / "test_v2_manifest_paths.lance" / "_versions"
@@ -410,11 +413,9 @@ async def test_create_table_v2_manifest_paths_async(tmp_path):
assert re.match(r"\d{20}\.manifest", manifest)
# Start a table in V1 mode then migrate
tbl = await db.create_table(
tbl = await db_no_v2_paths.create_table(
"test_v2_migration",
data=[{"id": 0}],
use_legacy_format=False,
enable_v2_manifest_paths=False,
)
assert not await tbl.uses_v2_manifest_paths()
manifests_dir = tmp_path / "test_v2_migration.lance" / "_versions"
@@ -583,7 +584,7 @@ def test_empty_or_nonexistent_table(mem_db: lancedb.DBConnection):
@pytest.mark.asyncio
async def test_create_in_v2_mode(mem_db_async: lancedb.AsyncConnection):
async def test_create_in_v2_mode():
def make_data():
for i in range(10):
yield pa.record_batch([pa.array([x for x in range(1024)])], names=["x"])
@@ -594,10 +595,13 @@ async def test_create_in_v2_mode(mem_db_async: lancedb.AsyncConnection):
schema = pa.schema([pa.field("x", pa.int64())])
# Create table in v1 mode
tbl = await mem_db_async.create_table(
"test", data=make_data(), schema=schema, data_storage_version="legacy"
v1_db = await lancedb.connect_async(
"memory://", storage_options={"new_table_data_storage_version": "legacy"}
)
tbl = await v1_db.create_table("test", data=make_data(), schema=schema)
async def is_in_v2_mode(tbl):
batches = (
await tbl.query().limit(10 * 1024).to_batches(max_batch_length=1024 * 10)
@@ -610,10 +614,12 @@ async def test_create_in_v2_mode(mem_db_async: lancedb.AsyncConnection):
assert not await is_in_v2_mode(tbl)
# Create table in v2 mode
tbl = await mem_db_async.create_table(
"test_v2", data=make_data(), schema=schema, use_legacy_format=False
v2_db = await lancedb.connect_async(
"memory://", storage_options={"new_table_data_storage_version": "stable"}
)
tbl = await v2_db.create_table("test_v2", data=make_data(), schema=schema)
assert await is_in_v2_mode(tbl)
# Add data (should remain in v2 mode)
@@ -622,20 +628,18 @@ async def test_create_in_v2_mode(mem_db_async: lancedb.AsyncConnection):
assert await is_in_v2_mode(tbl)
# Create empty table in v2 mode and add data
tbl = await mem_db_async.create_table(
"test_empty_v2", data=None, schema=schema, use_legacy_format=False
)
tbl = await v2_db.create_table("test_empty_v2", data=None, schema=schema)
await tbl.add(make_table())
assert await is_in_v2_mode(tbl)
# Create empty table uses v1 mode by default
tbl = await mem_db_async.create_table(
"test_empty_v2_default", data=None, schema=schema, data_storage_version="legacy"
)
# Db uses v2 mode by default
db = await lancedb.connect_async("memory://")
tbl = await db.create_table("test_empty_v2_default", data=None, schema=schema)
await tbl.add(make_table())
assert not await is_in_v2_mode(tbl)
assert await is_in_v2_mode(tbl)
def test_replace_index(mem_db: lancedb.DBConnection):

View File

@@ -1,10 +1,10 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};
use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::FromPyArrow};
use lancedb::connection::{Connection as LanceConnection, CreateTableMode, LanceFileVersion};
use lancedb::{connection::Connection as LanceConnection, database::CreateTableMode};
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
pyclass, pyfunction, pymethods, Bound, FromPyObject, PyAny, PyRef, PyResult, Python,
@@ -80,15 +80,13 @@ impl Connection {
future_into_py(self_.py(), async move { op.execute().await.infer_error() })
}
#[pyo3(signature = (name, mode, data, storage_options=None, data_storage_version=None, enable_v2_manifest_paths=None))]
#[pyo3(signature = (name, mode, data, storage_options=None))]
pub fn create_table<'a>(
self_: PyRef<'a, Self>,
name: String,
mode: &str,
data: Bound<'_, PyAny>,
storage_options: Option<HashMap<String, String>>,
data_storage_version: Option<String>,
enable_v2_manifest_paths: Option<bool>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.get_inner()?.clone();
@@ -101,32 +99,19 @@ impl Connection {
builder = builder.storage_options(storage_options);
}
if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths {
builder = builder.enable_v2_manifest_paths(enable_v2_manifest_paths);
}
if let Some(data_storage_version) = data_storage_version.as_ref() {
builder = builder.data_storage_version(
LanceFileVersion::from_str(data_storage_version)
.map_err(|e| PyValueError::new_err(e.to_string()))?,
);
}
future_into_py(self_.py(), async move {
let table = builder.execute().await.infer_error()?;
Ok(Table::new(table))
})
}
#[pyo3(signature = (name, mode, schema, storage_options=None, data_storage_version=None, enable_v2_manifest_paths=None))]
#[pyo3(signature = (name, mode, schema, storage_options=None))]
pub fn create_empty_table<'a>(
self_: PyRef<'a, Self>,
name: String,
mode: &str,
schema: Bound<'_, PyAny>,
storage_options: Option<HashMap<String, String>>,
data_storage_version: Option<String>,
enable_v2_manifest_paths: Option<bool>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.get_inner()?.clone();
@@ -140,17 +125,6 @@ impl Connection {
builder = builder.storage_options(storage_options);
}
if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths {
builder = builder.enable_v2_manifest_paths(enable_v2_manifest_paths);
}
if let Some(data_storage_version) = data_storage_version.as_ref() {
builder = builder.data_storage_version(
LanceFileVersion::from_str(data_storage_version)
.map_err(|e| PyValueError::new_err(e.to_string()))?,
);
}
future_into_py(self_.py(), async move {
let table = builder.execute().await.infer_error()?;
Ok(Table::new(table))

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,133 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! The database module defines the `Database` trait and related types.
//!
//! A "database" is a generic concept for something that manages tables and their metadata.
//!
//! We provide a basic implementation of a database that requires no additional infrastructure
//! and is based off listing directories in a filesystem.
//!
//! Users may want to provider their own implementations for a variety of reasons:
//! * Tables may be arranged in a different order on the S3 filesystem
//! * Tables may be managed by some kind of independent application (e.g. some database)
//! * Tables may be managed by a database system (e.g. Postgres)
//! * A custom table implementation (e.g. remote table, etc.) may be used
use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::RecordBatchReader;
use lance::dataset::ReadParams;
use crate::error::Result;
use crate::table::{TableDefinition, TableInternal, WriteOptions};
pub mod listing;
pub trait DatabaseOptions {
fn serialize_into_map(&self, map: &mut HashMap<String, String>);
}
/// A request to list names of tables in the database
#[derive(Clone, Debug, Default)]
pub struct TableNamesRequest {
/// If present, only return names that come lexicographically after the supplied
/// value.
///
/// This can be combined with limit to implement pagination by setting this to
/// the last table name from the previous page.
pub start_after: Option<String>,
/// The maximum number of table names to return
pub limit: Option<u32>,
}
/// A request to open a table
#[derive(Clone, Debug)]
pub struct OpenTableRequest {
pub name: String,
pub index_cache_size: Option<u32>,
pub lance_read_params: Option<ReadParams>,
}
pub type TableBuilderCallback = Box<dyn FnOnce(OpenTableRequest) -> OpenTableRequest + Send>;
/// Describes what happens when creating a table and a table with
/// the same name already exists
pub enum CreateTableMode {
/// If the table already exists, an error is returned
Create,
/// If the table already exists, it is opened. Any provided data is
/// ignored. The function will be passed an OpenTableBuilder to customize
/// how the table is opened
ExistOk(TableBuilderCallback),
/// If the table already exists, it is overwritten
Overwrite,
}
impl CreateTableMode {
pub fn exist_ok(
callback: impl FnOnce(OpenTableRequest) -> OpenTableRequest + Send + 'static,
) -> Self {
Self::ExistOk(Box::new(callback))
}
}
impl Default for CreateTableMode {
fn default() -> Self {
Self::Create
}
}
/// The data to start a table or a schema to create an empty table
pub enum CreateTableData {
/// Creates a table using data, no schema required as it will be obtained from the data
Data(Box<dyn RecordBatchReader + Send>),
/// Creates an empty table, the definition / schema must be provided separately
Empty(TableDefinition),
}
/// A request to create a table
pub struct CreateTableRequest {
/// The name of the new table
pub name: String,
/// Initial data to write to the table, can be None to create an empty table
pub data: CreateTableData,
/// The mode to use when creating the table
pub mode: CreateTableMode,
/// Options to use when writing data (only used if `data` is not None)
pub write_options: WriteOptions,
}
impl CreateTableRequest {
pub fn new(name: String, data: CreateTableData) -> Self {
Self {
name,
data,
mode: CreateTableMode::default(),
write_options: WriteOptions::default(),
}
}
}
/// The `Database` trait defines the interface for database implementations.
///
/// A database is responsible for managing tables and their metadata.
#[async_trait::async_trait]
pub trait Database:
Send + Sync + std::any::Any + std::fmt::Debug + std::fmt::Display + 'static
{
/// List the names of tables in the database
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>>;
/// Create a table in the database
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn TableInternal>>;
/// Open a table in the database
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn TableInternal>>;
/// Rename a table in the database
async fn rename_table(&self, old_name: &str, new_name: &str) -> Result<()>;
/// Drop a table in the database
async fn drop_table(&self, name: &str) -> Result<()>;
/// Drop all tables in the database
async fn drop_db(&self) -> Result<()>;
fn as_any(&self) -> &dyn std::any::Any;
}

View File

@@ -0,0 +1,545 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Provides the `ListingDatabase`, a simple database where tables are folders in a directory
use std::fs::create_dir_all;
use std::path::Path;
use std::{collections::HashMap, sync::Arc};
use arrow_array::RecordBatchIterator;
use lance::dataset::{ReadParams, WriteMode};
use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry, WrappingObjectStore};
use lance_encoding::version::LanceFileVersion;
use lance_table::io::commit::commit_handler_from_url;
use object_store::local::LocalFileSystem;
use snafu::{OptionExt, ResultExt};
use crate::connection::ConnectRequest;
use crate::error::{CreateDirSnafu, Error, InvalidTableNameSnafu, Result};
use crate::io::object_store::MirroringObjectStoreWrapper;
use crate::table::NativeTable;
use crate::utils::validate_table_name;
use super::{
CreateTableData, CreateTableMode, CreateTableRequest, Database, DatabaseOptions,
OpenTableRequest, TableInternal, TableNamesRequest,
};
/// File extension to indicate a lance table
pub const LANCE_FILE_EXTENSION: &str = "lance";
pub const OPT_NEW_TABLE_STORAGE_VERSION: &str = "new_table_data_storage_version";
pub const OPT_NEW_TABLE_V2_MANIFEST_PATHS: &str = "new_table_enable_v2_manifest_paths";
/// Controls how new tables should be created
#[derive(Clone, Debug, Default)]
pub struct NewTableConfig {
/// The storage version to use for new tables
///
/// If unset, then the latest stable version will be used
pub data_storage_version: Option<LanceFileVersion>,
/// Whether to enable V2 manifest paths for new tables
///
/// V2 manifest paths are more efficient than V2 manifest paths but are not
/// supported by old clients.
pub enable_v2_manifest_paths: Option<bool>,
}
/// Options specific to the listing database
#[derive(Debug, Default, Clone)]
pub struct ListingDatabaseOptions {
/// Controls what kind of Lance tables will be created by this database
pub new_table_config: NewTableConfig,
}
impl ListingDatabaseOptions {
fn parse_from_map(map: &HashMap<String, String>) -> Result<Self> {
let new_table_config = NewTableConfig {
data_storage_version: map
.get(OPT_NEW_TABLE_STORAGE_VERSION)
.map(|s| s.parse())
.transpose()?,
enable_v2_manifest_paths: map
.get(OPT_NEW_TABLE_V2_MANIFEST_PATHS)
.map(|s| {
s.parse::<bool>().map_err(|_| Error::InvalidInput {
message: format!(
"enable_v2_manifest_paths must be a boolean, received {}",
s
),
})
})
.transpose()?,
};
Ok(Self { new_table_config })
}
}
impl DatabaseOptions for ListingDatabaseOptions {
fn serialize_into_map(&self, map: &mut HashMap<String, String>) {
if let Some(storage_version) = &self.new_table_config.data_storage_version {
map.insert(
OPT_NEW_TABLE_STORAGE_VERSION.to_string(),
storage_version.to_string(),
);
}
if let Some(enable_v2_manifest_paths) = self.new_table_config.enable_v2_manifest_paths {
map.insert(
OPT_NEW_TABLE_V2_MANIFEST_PATHS.to_string(),
enable_v2_manifest_paths.to_string(),
);
}
}
}
/// A database that stores tables in a flat directory structure
///
/// Tables are stored as directories in the base path of the object store.
///
/// It is called a "listing database" because we use a "list directory" operation
/// to discover what tables are available. Table names are determined from the directory
/// names.
///
/// For example, given the following directory structure:
///
/// ```text
/// /data
/// /table1.lance
/// /table2.lance
/// ```
///
/// We will have two tables named `table1` and `table2`.
#[derive(Debug)]
pub struct ListingDatabase {
object_store: ObjectStore,
query_string: Option<String>,
pub(crate) uri: String,
pub(crate) base_path: object_store::path::Path,
// the object store wrapper to use on write path
pub(crate) store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
read_consistency_interval: Option<std::time::Duration>,
// Storage options to be inherited by tables created from this connection
storage_options: HashMap<String, String>,
// Options for tables created by this connection
new_table_config: NewTableConfig,
}
impl std::fmt::Display for ListingDatabase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ListingDatabase(uri={}, read_consistency_interval={})",
self.uri,
match self.read_consistency_interval {
None => {
"None".to_string()
}
Some(duration) => {
format!("{}s", duration.as_secs_f64())
}
}
)
}
}
const LANCE_EXTENSION: &str = "lance";
const ENGINE: &str = "engine";
const MIRRORED_STORE: &str = "mirroredStore";
/// A connection to LanceDB
impl ListingDatabase {
/// Connect to a listing database
///
/// The URI should be a path to a directory where the tables are stored.
///
/// See [`ListingDatabaseOptions`] for options that can be set on the connection (via
/// `storage_options`).
pub async fn connect_with_options(request: &ConnectRequest) -> Result<Self> {
let uri = &request.uri;
let parse_res = url::Url::parse(uri);
let options = ListingDatabaseOptions::parse_from_map(&request.storage_options)?;
// TODO: pass params regardless of OS
match parse_res {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
Self::open_path(
uri,
request.read_consistency_interval,
options.new_table_config,
)
.await
}
Ok(mut url) => {
// iter thru the query params and extract the commit store param
let mut engine = None;
let mut mirrored_store = None;
let mut filtered_querys = vec![];
// WARNING: specifying engine is NOT a publicly supported feature in lancedb yet
// THE API WILL CHANGE
for (key, value) in url.query_pairs() {
if key == ENGINE {
engine = Some(value.to_string());
} else if key == MIRRORED_STORE {
if cfg!(windows) {
return Err(Error::NotSupported {
message: "mirrored store is not supported on windows".into(),
});
}
mirrored_store = Some(value.to_string());
} else {
// to owned so we can modify the url
filtered_querys.push((key.to_string(), value.to_string()));
}
}
// Filter out the commit store query param -- it's a lancedb param
url.query_pairs_mut().clear();
url.query_pairs_mut().extend_pairs(filtered_querys);
// Take a copy of the query string so we can propagate it to lance
let query_string = url.query().map(|s| s.to_string());
// clear the query string so we can use the url as the base uri
// use .set_query(None) instead of .set_query("") because the latter
// will add a trailing '?' to the url
url.set_query(None);
let table_base_uri = if let Some(store) = engine {
static WARN_ONCE: std::sync::Once = std::sync::Once::new();
WARN_ONCE.call_once(|| {
log::warn!("Specifying engine is not a publicly supported feature in lancedb yet. THE API WILL CHANGE");
});
let old_scheme = url.scheme().to_string();
let new_scheme = format!("{}+{}", old_scheme, store);
url.to_string().replacen(&old_scheme, &new_scheme, 1)
} else {
url.to_string()
};
let plain_uri = url.to_string();
let registry = Arc::new(ObjectStoreRegistry::default());
let storage_options = request.storage_options.clone();
let os_params = ObjectStoreParams {
storage_options: Some(storage_options.clone()),
..Default::default()
};
let (object_store, base_path) =
ObjectStore::from_uri_and_params(registry, &plain_uri, &os_params).await?;
if object_store.is_local() {
Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?;
}
let write_store_wrapper = match mirrored_store {
Some(path) => {
let mirrored_store = Arc::new(LocalFileSystem::new_with_prefix(path)?);
let wrapper = MirroringObjectStoreWrapper::new(mirrored_store);
Some(Arc::new(wrapper) as Arc<dyn WrappingObjectStore>)
}
None => None,
};
Ok(Self {
uri: table_base_uri,
query_string,
base_path,
object_store,
store_wrapper: write_store_wrapper,
read_consistency_interval: request.read_consistency_interval,
storage_options,
new_table_config: options.new_table_config,
})
}
Err(_) => {
Self::open_path(
uri,
request.read_consistency_interval,
options.new_table_config,
)
.await
}
}
}
async fn open_path(
path: &str,
read_consistency_interval: Option<std::time::Duration>,
new_table_config: NewTableConfig,
) -> Result<Self> {
let (object_store, base_path) = ObjectStore::from_uri(path).await?;
if object_store.is_local() {
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
}
Ok(Self {
uri: path.to_string(),
query_string: None,
base_path,
object_store,
store_wrapper: None,
read_consistency_interval,
storage_options: HashMap::new(),
new_table_config,
})
}
/// Try to create a local directory to store the lancedb dataset
fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> {
let path = Path::new(path);
if !path.try_exists()? {
create_dir_all(path)?;
}
Ok(())
}
/// Get the URI of a table in the database.
fn table_uri(&self, name: &str) -> Result<String> {
validate_table_name(name)?;
let path = Path::new(&self.uri);
let table_uri = path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION));
let mut uri = table_uri
.as_path()
.to_str()
.context(InvalidTableNameSnafu {
name,
reason: "Name is not valid URL",
})?
.to_string();
// If there are query string set on the connection, propagate to lance
if let Some(query) = self.query_string.as_ref() {
uri.push('?');
uri.push_str(query.as_str());
}
Ok(uri)
}
}
#[async_trait::async_trait]
impl Database for ListingDatabase {
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
let mut f = self
.object_store
.read_dir(self.base_path.clone())
.await?
.iter()
.map(Path::new)
.filter(|path| {
let is_lance = path
.extension()
.and_then(|e| e.to_str())
.map(|e| e == LANCE_EXTENSION);
is_lance.unwrap_or(false)
})
.filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from)))
.collect::<Vec<String>>();
f.sort();
if let Some(start_after) = request.start_after {
let index = f
.iter()
.position(|name| name.as_str() > start_after.as_str())
.unwrap_or(f.len());
f.drain(0..index);
}
if let Some(limit) = request.limit {
f.truncate(limit as usize);
}
Ok(f)
}
async fn create_table(
&self,
mut request: CreateTableRequest,
) -> Result<Arc<dyn TableInternal>> {
let table_uri = self.table_uri(&request.name)?;
// Inherit storage options from the connection
let storage_options = request
.write_options
.lance_write_params
.get_or_insert_with(Default::default)
.store_params
.get_or_insert_with(Default::default)
.storage_options
.get_or_insert_with(Default::default);
for (key, value) in self.storage_options.iter() {
if !storage_options.contains_key(key) {
storage_options.insert(key.clone(), value.clone());
}
}
let storage_options = storage_options.clone();
let mut write_params = request.write_options.lance_write_params.unwrap_or_default();
if let Some(storage_version) = &self.new_table_config.data_storage_version {
write_params.data_storage_version = Some(*storage_version);
} else {
// Allow the user to override the storage version via storage options (backwards compatibility)
if let Some(data_storage_version) = storage_options.get(OPT_NEW_TABLE_STORAGE_VERSION) {
write_params.data_storage_version = Some(data_storage_version.parse()?);
}
}
if let Some(enable_v2_manifest_paths) = self.new_table_config.enable_v2_manifest_paths {
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths;
} else {
// Allow the user to override the storage version via storage options (backwards compatibility)
if let Some(enable_v2_manifest_paths) = storage_options
.get(OPT_NEW_TABLE_V2_MANIFEST_PATHS)
.map(|s| s.parse::<bool>().unwrap())
{
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths;
}
}
if matches!(&request.mode, CreateTableMode::Overwrite) {
write_params.mode = WriteMode::Overwrite;
}
let data = match request.data {
CreateTableData::Data(data) => data,
CreateTableData::Empty(table_definition) => {
let schema = table_definition.schema.clone();
Box::new(RecordBatchIterator::new(vec![], schema))
}
};
let data_schema = data.schema();
match NativeTable::create(
&table_uri,
&request.name,
data,
self.store_wrapper.clone(),
Some(write_params),
self.read_consistency_interval,
)
.await
{
Ok(table) => Ok(Arc::new(table)),
Err(Error::TableAlreadyExists { name }) => match request.mode {
CreateTableMode::Create => Err(Error::TableAlreadyExists { name }),
CreateTableMode::ExistOk(callback) => {
let req = OpenTableRequest {
name: request.name.clone(),
index_cache_size: None,
lance_read_params: None,
};
let req = (callback)(req);
let table = self.open_table(req).await?;
let table_schema = table.schema().await?;
if table_schema != data_schema {
return Err(Error::Schema {
message: "Provided schema does not match existing table schema"
.to_string(),
});
}
Ok(table)
}
CreateTableMode::Overwrite => unreachable!(),
},
Err(err) => Err(err),
}
}
async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn TableInternal>> {
let table_uri = self.table_uri(&request.name)?;
// Inherit storage options from the connection
let storage_options = request
.lance_read_params
.get_or_insert_with(Default::default)
.store_options
.get_or_insert_with(Default::default)
.storage_options
.get_or_insert_with(Default::default);
for (key, value) in self.storage_options.iter() {
if !storage_options.contains_key(key) {
storage_options.insert(key.clone(), value.clone());
}
}
// Some ReadParams are exposed in the OpenTableBuilder, but we also
// let the user provide their own ReadParams.
//
// If we have a user provided ReadParams use that
// If we don't then start with the default ReadParams and customize it with
// the options from the OpenTableBuilder
let read_params = request.lance_read_params.unwrap_or_else(|| {
let mut default_params = ReadParams::default();
if let Some(index_cache_size) = request.index_cache_size {
default_params.index_cache_size = index_cache_size as usize;
}
default_params
});
let native_table = Arc::new(
NativeTable::open_with_params(
&table_uri,
&request.name,
self.store_wrapper.clone(),
Some(read_params),
self.read_consistency_interval,
)
.await?,
);
Ok(native_table)
}
async fn rename_table(&self, _old_name: &str, _new_name: &str) -> Result<()> {
Err(Error::NotSupported {
message: "rename_table is not supported in LanceDB OSS".to_string(),
})
}
async fn drop_table(&self, name: &str) -> Result<()> {
let dir_name = format!("{}.{}", name, LANCE_EXTENSION);
let full_path = self.base_path.child(dir_name.clone());
self.object_store
.remove_dir_all(full_path.clone())
.await
.map_err(|err| match err {
// this error is not lance::Error::DatasetNotFound,
// as the method `remove_dir_all` may be used to remove something not be a dataset
lance::Error::NotFound { .. } => Error::TableNotFound {
name: name.to_owned(),
},
_ => Error::from(err),
})?;
let object_store_params = ObjectStoreParams {
storage_options: Some(self.storage_options.clone()),
..Default::default()
};
let mut uri = self.uri.clone();
if let Some(query_string) = &self.query_string {
uri.push_str(&format!("?{}", query_string));
}
let commit_handler = commit_handler_from_url(&uri, &Some(object_store_params))
.await
.unwrap();
commit_handler.delete(&full_path).await.unwrap();
Ok(())
}
async fn drop_db(&self) -> Result<()> {
self.object_store
.remove_dir_all(self.base_path.clone())
.await?;
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}

View File

@@ -193,6 +193,7 @@
pub mod arrow;
pub mod connection;
pub mod data;
pub mod database;
pub mod embeddings;
pub mod error;
pub mod index;

View File

@@ -1015,7 +1015,7 @@ mod tests {
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
use tempfile::tempdir;
use crate::{connect, connection::CreateTableMode, Table};
use crate::{connect, database::CreateTableMode, Table};
#[tokio::test]
async fn test_setters_getters() {

View File

@@ -15,7 +15,7 @@ use crate::remote::db::RemoteOptions;
const REQUEST_ID_HEADER: &str = "x-request-id";
/// Configuration for the LanceDB Cloud HTTP client.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct ClientConfig {
pub timeout_config: TimeoutConfig,
pub retry_config: RetryConfig,
@@ -36,7 +36,7 @@ impl Default for ClientConfig {
}
/// How to handle timeouts for HTTP requests.
#[derive(Default, Debug)]
#[derive(Clone, Default, Debug)]
pub struct TimeoutConfig {
/// The timeout for creating a connection to the server.
///
@@ -62,7 +62,7 @@ pub struct TimeoutConfig {
}
/// How to handle retries for HTTP requests.
#[derive(Default, Debug)]
#[derive(Clone, Default, Debug)]
pub struct RetryConfig {
/// The number of times to retry a request if it fails.
///

View File

@@ -4,7 +4,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::RecordBatchReader;
use arrow_array::RecordBatchIterator;
use async_trait::async_trait;
use http::StatusCode;
use lance_io::object_store::StorageOptions;
@@ -13,13 +13,12 @@ use reqwest::header::CONTENT_TYPE;
use serde::Deserialize;
use tokio::task::spawn_blocking;
use crate::connection::{
ConnectionInternal, CreateTableBuilder, CreateTableMode, NoData, OpenTableBuilder,
TableNamesBuilder,
use crate::database::{
CreateTableData, CreateTableMode, CreateTableRequest, Database, OpenTableRequest,
TableNamesRequest,
};
use crate::embeddings::EmbeddingRegistry;
use crate::error::Result;
use crate::Table;
use crate::table::TableInternal;
use super::client::{ClientConfig, HttpSend, RequestResultExt, RestfulLanceDbClient, Sender};
use super::table::RemoteTable;
@@ -105,13 +104,13 @@ impl From<&CreateTableMode> for &'static str {
}
#[async_trait]
impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
async fn table_names(&self, options: TableNamesBuilder) -> Result<Vec<String>> {
impl<S: HttpSend> Database for RemoteDatabase<S> {
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
let mut req = self.client.get("/v1/table/");
if let Some(limit) = options.limit {
if let Some(limit) = request.limit {
req = req.query(&[("limit", limit)]);
}
if let Some(start_after) = options.start_after {
if let Some(start_after) = request.start_after {
req = req.query(&[("page_token", start_after)]);
}
let (request_id, rsp) = self.client.send(req, true).await?;
@@ -127,11 +126,15 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
Ok(tables)
}
async fn do_create_table(
&self,
options: CreateTableBuilder<false, NoData>,
data: Box<dyn RecordBatchReader + Send>,
) -> Result<Table> {
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn TableInternal>> {
let data = match request.data {
CreateTableData::Data(data) => data,
CreateTableData::Empty(table_definition) => {
let schema = table_definition.schema.clone();
Box::new(RecordBatchIterator::new(vec![], schema))
}
};
// TODO: https://github.com/lancedb/lancedb/issues/1026
// We should accept data from an async source. In the meantime, spawn this as blocking
// to make sure we don't block the tokio runtime if the source is slow.
@@ -141,8 +144,8 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
let req = self
.client
.post(&format!("/v1/table/{}/create/", options.name))
.query(&[("mode", Into::<&str>::into(&options.mode))])
.post(&format!("/v1/table/{}/create/", request.name))
.query(&[("mode", Into::<&str>::into(&request.mode))])
.body(data_buffer)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
@@ -151,14 +154,18 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
if rsp.status() == StatusCode::BAD_REQUEST {
let body = rsp.text().await.err_to_http(request_id.clone())?;
if body.contains("already exists") {
return match options.mode {
return match request.mode {
CreateTableMode::Create => {
Err(crate::Error::TableAlreadyExists { name: options.name })
Err(crate::Error::TableAlreadyExists { name: request.name })
}
CreateTableMode::ExistOk(callback) => {
let builder = OpenTableBuilder::new(options.parent, options.name);
let builder = (callback)(builder);
builder.execute().await
let req = OpenTableRequest {
name: request.name.clone(),
index_cache_size: None,
lance_read_params: None,
};
let req = (callback)(req);
self.open_table(req).await
}
// This should not happen, as we explicitly set the mode to overwrite and the server
@@ -183,31 +190,31 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
self.client.check_response(&request_id, rsp).await?;
self.table_cache.insert(options.name.clone(), ()).await;
self.table_cache.insert(request.name.clone(), ()).await;
Ok(Table::new(Arc::new(RemoteTable::new(
Ok(Arc::new(RemoteTable::new(
self.client.clone(),
options.name,
))))
request.name,
)))
}
async fn do_open_table(&self, options: OpenTableBuilder) -> Result<Table> {
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn TableInternal>> {
// We describe the table to confirm it exists before moving on.
if self.table_cache.get(&options.name).is_none() {
if self.table_cache.get(&request.name).is_none() {
let req = self
.client
.post(&format!("/v1/table/{}/describe/", options.name));
.post(&format!("/v1/table/{}/describe/", request.name));
let (request_id, resp) = self.client.send(req, true).await?;
if resp.status() == StatusCode::NOT_FOUND {
return Err(crate::Error::TableNotFound { name: options.name });
return Err(crate::Error::TableNotFound { name: request.name });
}
self.client.check_response(&request_id, resp).await?;
}
Ok(Table::new(Arc::new(RemoteTable::new(
Ok(Arc::new(RemoteTable::new(
self.client.clone(),
options.name,
))))
request.name,
)))
}
async fn rename_table(&self, current_name: &str, new_name: &str) -> Result<()> {
@@ -236,8 +243,8 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
})
}
fn embedding_registry(&self) -> &dyn EmbeddingRegistry {
todo!()
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
@@ -273,7 +280,7 @@ mod tests {
use crate::connection::ConnectBuilder;
use crate::{
connection::CreateTableMode,
database::CreateTableMode,
remote::{ARROW_STREAM_CONTENT_TYPE, JSON_CONTENT_TYPE},
Connection, Error,
};

View File

@@ -230,6 +230,24 @@ pub struct OptimizeStats {
pub prune: Option<RemovalStats>,
}
/// Describes what happens when a vector either contains NaN or
/// does not have enough values
#[derive(Clone, Debug, Default)]
enum BadVectorHandling {
/// An error is returned
#[default]
Error,
#[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992
/// The offending row is droppped
Drop,
#[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992
/// The invalid/missing items are replaced by fill_value
Fill(f32),
#[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992
/// The invalid items are replaced by NULL
None,
}
/// Options to use when writing data
#[derive(Clone, Debug, Default)]
pub struct WriteOptions {
@@ -364,7 +382,7 @@ impl UpdateBuilder {
}
#[async_trait]
pub(crate) trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Sync {
pub trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Sync {
#[allow(dead_code)]
fn as_any(&self) -> &dyn std::any::Any;
/// Cast as [`NativeTable`], or return None it if is not a [`NativeTable`].
@@ -465,7 +483,7 @@ impl std::fmt::Display for Table {
}
impl Table {
pub(crate) fn new(inner: Arc<dyn TableInternal>) -> Self {
pub fn new(inner: Arc<dyn TableInternal>) -> Self {
Self {
inner,
embedding_registry: Arc::new(MemoryRegistry::new()),
@@ -1164,7 +1182,7 @@ impl NativeTable {
/// # Returns
///
/// * A [TableImpl] object.
pub(crate) async fn create(
pub async fn create(
uri: &str,
name: &str,
batches: impl RecordBatchReader + Send + 'static,