Compare commits

..

13 Commits

Author SHA1 Message Date
Lance Release
ebbeeff4e0 Bump version: 0.25.1-beta.2 → 0.25.1-beta.3 2025-09-22 04:47:42 +00:00
Jack Ye
407ca53f92 chore: increase pypi publish timeout and use warp runner for arm64 (#2670)
Fix failures like:
https://github.com/lancedb/lancedb/actions/runs/17840462235/job/50748940233

ARM64 build cannot succeed within 1 hour, x86-64 build sometimes cannot
succeed within 1 hour.
2025-09-21 21:42:44 -07:00
Jack Ye
ff71d7e552 feat: support shallow clone (#2653)
Support shallow cloning a dataset at a specific location to create a new
dataset, using the shallow_clone feature in Lance. Also introduce remote
`clone` API for remote tables for this functionality.
2025-09-21 21:28:40 -07:00
Neha Prasad
2261eb95a0 fix(node): handle undefined vector fields with embedding functions (#2655)
- Fixes issue where passing `{ vector: undefined }` with an embedding
function threw "Found field not in schema" error instead of calling the
embedding function like `null` or omitted fields.

**Changes:**
- Modified `rowPathsAndValues` to skip undefined values during schema
inference
- Added test case verifying undefined, null, and omitted vector fields
all work correctly

**Before:** `{ vector: undefined }` → Error
**After:** `{ vector: undefined }` → Calls embedding function

Closes #2647
2025-09-19 09:17:28 -07:00
Jack Ye
5b397e410b chore: fix out of date tests with new namespace validation (#2663)
Failure:
https://github.com/lancedb/lancedb/actions/runs/17820044478/job/50660516344
2025-09-18 13:29:47 -07:00
Lance Release
b5a39bffec Bump version: 0.22.1-beta.1 → 0.22.1-beta.2 2025-09-18 20:22:35 +00:00
Lance Release
5e1e9add07 Bump version: 0.25.1-beta.1 → 0.25.1-beta.2 2025-09-18 20:21:33 +00:00
Jack Ye
97e9938dfe fix: add missing validations to namespace operations (#2659) 2025-09-17 23:27:04 -07:00
Weston Pace
1d4b92e01e refactor: remove catalog implementation now that we have namespaces in database (#2662)
We had previously prototyped a `Catalog` trait anticipating a
three-tiered Catalog-Database-Table structure. Now that we have
namespaces in the `Database` we can support any tiering scheme and the
`Catalog` trait is no longer needed.
2025-09-17 08:40:20 -07:00
Le Duc Manh
4c9fc3044b fix: use create to resolve variables (#2640)
# What
- Use `create` to resolve variables values

# Reference
Fixes #2181

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
2025-09-12 13:07:32 -07:00
Jack Ye
0ebc8d45a8 chore: fix no lock build warnings and CI timeouts (#2650)
Example CI failures:
- publish build timeout:
https://github.com/lancedb/lancedb/actions/runs/17626482881/job/50084552906
- doc test build timeout:
https://github.com/lancedb/lancedb/actions/runs/17627058590/job/50086456818
2025-09-11 15:30:35 -07:00
BubbleCal
f7d78c3420 feat: add 'target_partition_size' param (#2642)
this exposes the param `target_partition_size` from lance

---------

Signed-off-by: BubbleCal <bubble-cal@outlook.com>
2025-09-11 22:56:16 +08:00
Lance Release
6ea6884260 Bump version: 0.22.1-beta.0 → 0.22.1-beta.1 2025-09-10 20:49:43 +00:00
54 changed files with 1994 additions and 898 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.22.1-beta.0"
current_version = "0.22.1-beta.2"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -24,7 +24,8 @@ env:
jobs:
test-python:
name: Test doc python code
runs-on: ubuntu-24.04
runs-on: warp-ubuntu-2204-x64-8x
timeout-minutes: 60
steps:
- name: Checkout
uses: actions/checkout@v4
@@ -48,7 +49,6 @@ jobs:
uses: swatinem/rust-cache@v2
- name: Build Python
working-directory: docs/test
timeout-minutes: 60
run:
python -m pip install --extra-index-url https://pypi.fury.io/lancedb/ -r requirements.txt
- name: Create test files

View File

@@ -56,7 +56,7 @@ jobs:
pypi_token: ${{ secrets.LANCEDB_PYPI_API_TOKEN }}
fury_token: ${{ secrets.FURY_TOKEN }}
mac:
timeout-minutes: 60
timeout-minutes: 90
runs-on: ${{ matrix.config.runner }}
strategy:
matrix:
@@ -64,7 +64,7 @@ jobs:
- target: x86_64-apple-darwin
runner: macos-13
- target: aarch64-apple-darwin
runner: macos-14
runner: warp-macos-14-arm64-6x
env:
MACOSX_DEPLOYMENT_TARGET: 10.15
steps:

16
Cargo.lock generated
View File

@@ -713,9 +713,9 @@ dependencies = [
[[package]]
name = "aws-sdk-s3"
version = "1.104.0"
version = "1.105.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38c488cd6abb0ec9811c401894191932e941c5f91dc226043edacd0afa1634bc"
checksum = "c99789e929b5e1d9a5aa3fa1d81317f3a789afc796141d11b0eaafd9d9f47e38"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -963,9 +963,9 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime"
version = "1.9.1"
version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3946acbe1ead1301ba6862e712c7903ca9bb230bdf1fbd1b5ac54158ef2ab1f"
checksum = "4fa63ad37685ceb7762fa4d73d06f1d5493feb88e3f27259b9ed277f4c01b185"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
@@ -1153,7 +1153,7 @@ dependencies = [
"bitflags 2.9.4",
"cexpr",
"clang-sys",
"itertools 0.12.1",
"itertools 0.11.0",
"lazy_static",
"lazycell",
"log",
@@ -4628,7 +4628,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.22.1-beta.0"
version = "0.22.1-beta.2"
dependencies = [
"arrow",
"arrow-array",
@@ -4715,7 +4715,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.22.1-beta.0"
version = "0.22.1-beta.2"
dependencies = [
"arrow-array",
"arrow-ipc",
@@ -4735,7 +4735,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.25.1-beta.0"
version = "0.25.1-beta.2"
dependencies = [
"arrow",
"async-trait",

View File

@@ -16,6 +16,7 @@ pub trait JNIEnvExt {
fn get_integers(&mut self, obj: &JObject) -> Result<Vec<i32>>;
/// Get strings from Java List<String> object.
#[allow(dead_code)]
fn get_strings(&mut self, obj: &JObject) -> Result<Vec<String>>;
/// Get strings from Java String[] object.

View File

@@ -6,6 +6,7 @@ use jni::JNIEnv;
use crate::Result;
#[allow(dead_code)]
pub trait FromJObject<T> {
fn extract(&self) -> Result<T>;
}
@@ -39,6 +40,7 @@ impl FromJObject<f64> for JObject<'_> {
}
}
#[allow(dead_code)]
pub trait FromJString {
fn extract(&self, env: &mut JNIEnv) -> Result<String>;
}
@@ -66,6 +68,7 @@ pub trait JMapExt {
fn get_f64(&self, env: &mut JNIEnv, key: &str) -> Result<Option<f64>>;
}
#[allow(dead_code)]
fn get_map_value<T>(env: &mut JNIEnv, map: &JMap, key: &str) -> Result<Option<T>>
where
for<'a> JObject<'a>: FromJObject<T>,

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.22.1-beta.0</version>
<version>0.22.1-beta.2</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.22.1-beta.0</version>
<version>0.22.1-beta.2</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.22.1-beta.0</version>
<version>0.22.1-beta.2</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.22.1-beta.0"
version = "0.22.1-beta.2"
license.workspace = true
description.workspace = true
repository.workspace = true

View File

@@ -203,3 +203,106 @@ describe("given a connection", () => {
});
});
});
describe("clone table functionality", () => {
let tmpDir: tmp.DirResult;
let db: Connection;
beforeEach(async () => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
db = await connect(tmpDir.name);
});
afterEach(() => tmpDir.removeCallback());
it("should clone a table with latest version (default behavior)", async () => {
// Create source table with some data
const data = [
{ id: 1, text: "hello", vector: [1.0, 2.0] },
{ id: 2, text: "world", vector: [3.0, 4.0] },
];
const sourceTable = await db.createTable("source", data);
// Add more data to create a new version
const moreData = [{ id: 3, text: "test", vector: [5.0, 6.0] }];
await sourceTable.add(moreData);
// Clone the table (should get latest version with 3 rows)
const sourceUri = `${tmpDir.name}/source.lance`;
const clonedTable = await db.cloneTable("cloned", sourceUri);
// Verify cloned table has all 3 rows
expect(await clonedTable.countRows()).toBe(3);
expect((await db.tableNames()).includes("cloned")).toBe(true);
});
it("should clone a table from a specific version", async () => {
// Create source table with initial data
const data = [
{ id: 1, text: "hello", vector: [1.0, 2.0] },
{ id: 2, text: "world", vector: [3.0, 4.0] },
];
const sourceTable = await db.createTable("source", data);
// Get the initial version
const initialVersion = await sourceTable.version();
// Add more data to create a new version
const moreData = [{ id: 3, text: "test", vector: [5.0, 6.0] }];
await sourceTable.add(moreData);
// Verify source now has 3 rows
expect(await sourceTable.countRows()).toBe(3);
// Clone from the initial version (should have only 2 rows)
const sourceUri = `${tmpDir.name}/source.lance`;
const clonedTable = await db.cloneTable("cloned", sourceUri, {
sourceVersion: initialVersion,
});
// Verify cloned table has only the initial 2 rows
expect(await clonedTable.countRows()).toBe(2);
});
it("should clone a table from a tagged version", async () => {
// Create source table with initial data
const data = [
{ id: 1, text: "hello", vector: [1.0, 2.0] },
{ id: 2, text: "world", vector: [3.0, 4.0] },
];
const sourceTable = await db.createTable("source", data);
// Create a tag for the current version
const tags = await sourceTable.tags();
await tags.create("v1.0", await sourceTable.version());
// Add more data after the tag
const moreData = [{ id: 3, text: "test", vector: [5.0, 6.0] }];
await sourceTable.add(moreData);
// Verify source now has 3 rows
expect(await sourceTable.countRows()).toBe(3);
// Clone from the tagged version (should have only 2 rows)
const sourceUri = `${tmpDir.name}/source.lance`;
const clonedTable = await db.cloneTable("cloned", sourceUri, {
sourceTag: "v1.0",
});
// Verify cloned table has only the tagged version's 2 rows
expect(await clonedTable.countRows()).toBe(2);
});
it("should fail when attempting deep clone", async () => {
// Create source table with some data
const data = [
{ id: 1, text: "hello", vector: [1.0, 2.0] },
{ id: 2, text: "world", vector: [3.0, 4.0] },
];
await db.createTable("source", data);
// Try to create a deep clone (should fail)
const sourceUri = `${tmpDir.name}/source.lance`;
await expect(
db.cloneTable("cloned", sourceUri, { isShallow: false }),
).rejects.toThrow("Deep clone is not yet implemented");
});
});

View File

@@ -256,6 +256,60 @@ describe("embedding functions", () => {
expect(actual).toHaveProperty("text");
});
it("should handle undefined vector field with embedding function correctly", async () => {
@register("undefined_test")
class MockEmbeddingFunction extends EmbeddingFunction<string> {
ndims() {
return 3;
}
embeddingDataType(): Float {
return new Float32();
}
async computeQueryEmbeddings(_data: string) {
return [1, 2, 3];
}
async computeSourceEmbeddings(data: string[]) {
return Array.from({ length: data.length }).fill([
1, 2, 3,
]) as number[][];
}
}
const func = getRegistry()
.get<MockEmbeddingFunction>("undefined_test")!
.create();
const schema = new Schema([
new Field("text", new Utf8(), true),
new Field(
"vector",
new FixedSizeList(3, new Field("item", new Float32(), true)),
true,
),
]);
const db = await connect(tmpDir.name);
const table = await db.createEmptyTable("test_undefined", schema, {
embeddingFunction: {
function: func,
sourceColumn: "text",
vectorColumn: "vector",
},
});
// Test that undefined, null, and omitted vector fields all work
await table.add([{ text: "test1", vector: undefined }]);
await table.add([{ text: "test2", vector: null }]);
await table.add([{ text: "test3" }]);
const rows = await table.query().toArray();
expect(rows.length).toBe(3);
// All rows should have vectors computed by the embedding function
for (const row of rows) {
expect(row.vector).toBeDefined();
expect(JSON.parse(JSON.stringify(row.vector))).toEqual([1, 2, 3]);
}
});
test.each([new Float16(), new Float32(), new Float64()])(
"should be able to provide manual embeddings with multiple float datatype",
async (floatType) => {

View File

@@ -512,7 +512,11 @@ function* rowPathsAndValues(
if (isObject(value)) {
yield* rowPathsAndValues(value, [...basePath, key]);
} else {
yield [[...basePath, key], value];
// Skip undefined values - they should be treated the same as missing fields
// for embedding function purposes
if (value !== undefined) {
yield [[...basePath, key], value];
}
}
}
}

View File

@@ -268,6 +268,33 @@ export abstract class Connection {
* @param {string[]} namespace The namespace to drop tables from (defaults to root namespace).
*/
abstract dropAllTables(namespace?: string[]): Promise<void>;
/**
* Clone a table from a source table.
*
* A shallow clone creates a new table that shares the underlying data files
* with the source table but has its own independent manifest. This allows
* both the source and cloned tables to evolve independently while initially
* sharing the same data, deletion, and index files.
*
* @param {string} targetTableName - The name of the target table to create.
* @param {string} sourceUri - The URI of the source table to clone from.
* @param {object} options - Clone options.
* @param {string[]} options.targetNamespace - The namespace for the target table (defaults to root namespace).
* @param {number} options.sourceVersion - The version of the source table to clone.
* @param {string} options.sourceTag - The tag of the source table to clone.
* @param {boolean} options.isShallow - Whether to perform a shallow clone (defaults to true).
*/
abstract cloneTable(
targetTableName: string,
sourceUri: string,
options?: {
targetNamespace?: string[];
sourceVersion?: number;
sourceTag?: string;
isShallow?: boolean;
},
): Promise<Table>;
}
/** @hideconstructor */
@@ -332,6 +359,28 @@ export class LocalConnection extends Connection {
return new LocalTable(innerTable);
}
async cloneTable(
targetTableName: string,
sourceUri: string,
options?: {
targetNamespace?: string[];
sourceVersion?: number;
sourceTag?: string;
isShallow?: boolean;
},
): Promise<Table> {
const innerTable = await this.inner.cloneTable(
targetTableName,
sourceUri,
options?.targetNamespace ?? [],
options?.sourceVersion ?? null,
options?.sourceTag ?? null,
options?.isShallow ?? true,
);
return new LocalTable(innerTable);
}
private getStorageOptions(
options?: Partial<CreateTableOptions>,
): Record<string, string> | undefined {

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.22.1-beta.0",
"version": "0.22.1-beta.2",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-x64",
"version": "0.22.1-beta.0",
"version": "0.22.1-beta.2",
"os": ["darwin"],
"cpu": ["x64"],
"main": "lancedb.darwin-x64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.22.1-beta.0",
"version": "0.22.1-beta.2",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.22.1-beta.0",
"version": "0.22.1-beta.2",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.22.1-beta.0",
"version": "0.22.1-beta.2",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.22.1-beta.0",
"version": "0.22.1-beta.2",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.22.1-beta.0",
"version": "0.22.1-beta.2",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.22.1-beta.0",
"version": "0.22.1-beta.2",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.22.1-beta.0",
"version": "0.22.1-beta.2",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.22.1-beta.0",
"version": "0.22.1-beta.2",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.22.1-beta.0",
"version": "0.22.1-beta.2",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -213,6 +213,36 @@ impl Connection {
Ok(Table::new(tbl))
}
#[napi(catch_unwind)]
pub async fn clone_table(
&self,
target_table_name: String,
source_uri: String,
target_namespace: Vec<String>,
source_version: Option<i64>,
source_tag: Option<String>,
is_shallow: bool,
) -> napi::Result<Table> {
let mut builder = self
.get_inner()?
.clone_table(&target_table_name, &source_uri);
builder = builder.target_namespace(target_namespace);
if let Some(version) = source_version {
builder = builder.source_version(version as u64);
}
if let Some(tag) = source_tag {
builder = builder.source_tag(tag);
}
builder = builder.is_shallow(is_shallow);
let tbl = builder.execute().await.default_error()?;
Ok(Table::new(tbl))
}
/// Drop table with the name. Or raise an error if the table does not exist.
#[napi(catch_unwind)]
pub async fn drop_table(&self, name: String, namespace: Vec<String>) -> napi::Result<()> {

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.25.1-beta.1"
current_version = "0.25.1-beta.3"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.25.1-beta.1"
version = "0.25.1-beta.3"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -60,6 +60,15 @@ class Connection(object):
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
) -> Table: ...
async def clone_table(
self,
target_table_name: str,
source_uri: str,
target_namespace: List[str] = [],
source_version: Optional[int] = None,
source_tag: Optional[str] = None,
is_shallow: bool = True,
) -> Table: ...
async def rename_table(
self,
cur_name: str,

View File

@@ -665,6 +665,60 @@ class LanceDBConnection(DBConnection):
index_cache_size=index_cache_size,
)
def clone_table(
self,
target_table_name: str,
source_uri: str,
*,
target_namespace: List[str] = [],
source_version: Optional[int] = None,
source_tag: Optional[str] = None,
is_shallow: bool = True,
) -> LanceTable:
"""Clone a table from a source table.
A shallow clone creates a new table that shares the underlying data files
with the source table but has its own independent manifest. This allows
both the source and cloned tables to evolve independently while initially
sharing the same data, deletion, and index files.
Parameters
----------
target_table_name: str
The name of the target table to create.
source_uri: str
The URI of the source table to clone from.
target_namespace: List[str], optional
The namespace for the target table.
None or empty list represents root namespace.
source_version: int, optional
The version of the source table to clone.
source_tag: str, optional
The tag of the source table to clone.
is_shallow: bool, default True
Whether to perform a shallow clone (True) or deep clone (False).
Currently only shallow clone is supported.
Returns
-------
A LanceTable object representing the cloned table.
"""
LOOP.run(
self._conn.clone_table(
target_table_name,
source_uri,
target_namespace=target_namespace,
source_version=source_version,
source_tag=source_tag,
is_shallow=is_shallow,
)
)
return LanceTable.open(
self,
target_table_name,
namespace=target_namespace,
)
@override
def drop_table(
self,
@@ -1136,6 +1190,54 @@ class AsyncConnection(object):
)
return AsyncTable(table)
async def clone_table(
self,
target_table_name: str,
source_uri: str,
*,
target_namespace: List[str] = [],
source_version: Optional[int] = None,
source_tag: Optional[str] = None,
is_shallow: bool = True,
) -> AsyncTable:
"""Clone a table from a source table.
A shallow clone creates a new table that shares the underlying data files
with the source table but has its own independent manifest. This allows
both the source and cloned tables to evolve independently while initially
sharing the same data, deletion, and index files.
Parameters
----------
target_table_name: str
The name of the target table to create.
source_uri: str
The URI of the source table to clone from.
target_namespace: List[str], optional
The namespace for the target table.
None or empty list represents root namespace.
source_version: int, optional
The version of the source table to clone.
source_tag: str, optional
The tag of the source table to clone.
is_shallow: bool, default True
Whether to perform a shallow clone (True) or deep clone (False).
Currently only shallow clone is supported.
Returns
-------
An AsyncTable object representing the cloned table.
"""
table = await self._inner.clone_table(
target_table_name,
source_uri,
target_namespace=target_namespace,
source_version=source_version,
source_tag=source_tag,
is_shallow=is_shallow,
)
return AsyncTable(table)
async def rename_table(
self,
cur_name: str,

View File

@@ -122,7 +122,7 @@ class EmbeddingFunctionRegistry:
obj["vector_column"]: EmbeddingFunctionConfig(
vector_column=obj["vector_column"],
source_column=obj["source_column"],
function=self.get(obj["name"])(**obj["model"]),
function=self.get(obj["name"]).create(**obj["model"]),
)
for obj in raw_list
}

View File

@@ -251,6 +251,13 @@ class HnswPq:
results. In most cases, there is no benefit to setting this higher than 500.
This value should be set to a value that is not less than `ef` in the
search phase.
target_partition_size, default is 1,048,576
The target size of each partition.
This value controls the tradeoff between search performance and accuracy.
faster search but less accurate results as higher value.
"""
distance_type: Literal["l2", "cosine", "dot"] = "l2"
@@ -261,6 +268,7 @@ class HnswPq:
sample_rate: int = 256
m: int = 20
ef_construction: int = 300
target_partition_size: Optional[int] = None
@dataclass
@@ -351,6 +359,12 @@ class HnswSq:
This value should be set to a value that is not less than `ef` in the search
phase.
target_partition_size, default is 1,048,576
The target size of each partition.
This value controls the tradeoff between search performance and accuracy.
faster search but less accurate results as higher value.
"""
distance_type: Literal["l2", "cosine", "dot"] = "l2"
@@ -359,6 +373,7 @@ class HnswSq:
sample_rate: int = 256
m: int = 20
ef_construction: int = 300
target_partition_size: Optional[int] = None
@dataclass
@@ -444,12 +459,20 @@ class IvfFlat:
cases the default should be sufficient.
The default value is 256.
target_partition_size, default is 8192
The target size of each partition.
This value controls the tradeoff between search performance and accuracy.
faster search but less accurate results as higher value.
"""
distance_type: Literal["l2", "cosine", "dot", "hamming"] = "l2"
num_partitions: Optional[int] = None
max_iterations: int = 50
sample_rate: int = 256
target_partition_size: Optional[int] = None
@dataclass
@@ -564,6 +587,13 @@ class IvfPq:
cases the default should be sufficient.
The default value is 256.
target_partition_size, default is 8192
The target size of each partition.
This value controls the tradeoff between search performance and accuracy.
faster search but less accurate results as higher value.
"""
distance_type: Literal["l2", "cosine", "dot"] = "l2"
@@ -572,6 +602,7 @@ class IvfPq:
num_bits: int = 8
max_iterations: int = 50
sample_rate: int = 256
target_partition_size: Optional[int] = None
__all__ = [

View File

@@ -212,6 +212,53 @@ class RemoteDBConnection(DBConnection):
table = LOOP.run(self._conn.open_table(name, namespace=namespace))
return RemoteTable(table, self.db_name)
def clone_table(
self,
target_table_name: str,
source_uri: str,
*,
target_namespace: List[str] = [],
source_version: Optional[int] = None,
source_tag: Optional[str] = None,
is_shallow: bool = True,
) -> Table:
"""Clone a table from a source table.
Parameters
----------
target_table_name: str
The name of the target table to create.
source_uri: str
The URI of the source table to clone from.
target_namespace: List[str], optional
The namespace for the target table.
None or empty list represents root namespace.
source_version: int, optional
The version of the source table to clone.
source_tag: str, optional
The tag of the source table to clone.
is_shallow: bool, default True
Whether to perform a shallow clone (True) or deep clone (False).
Currently only shallow clone is supported.
Returns
-------
A RemoteTable object representing the cloned table.
"""
from .table import RemoteTable
table = LOOP.run(
self._conn.clone_table(
target_table_name,
source_uri,
target_namespace=target_namespace,
source_version=source_version,
source_tag=source_tag,
is_shallow=is_shallow,
)
)
return RemoteTable(table, self.db_name)
@override
def create_table(
self,

View File

@@ -691,6 +691,7 @@ class Table(ABC):
ef_construction: int = 300,
name: Optional[str] = None,
train: bool = True,
target_partition_size: Optional[int] = None,
):
"""Create an index on the table.
@@ -2002,6 +2003,7 @@ class LanceTable(Table):
*,
name: Optional[str] = None,
train: bool = True,
target_partition_size: Optional[int] = None,
):
"""Create an index on the table."""
if accelerator is not None:
@@ -2018,6 +2020,7 @@ class LanceTable(Table):
num_bits=num_bits,
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
)
self.checkout_latest()
return
@@ -2027,6 +2030,7 @@ class LanceTable(Table):
num_partitions=num_partitions,
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
)
elif index_type == "IVF_PQ":
config = IvfPq(
@@ -2036,6 +2040,7 @@ class LanceTable(Table):
num_bits=num_bits,
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
)
elif index_type == "IVF_HNSW_PQ":
config = HnswPq(
@@ -2047,6 +2052,7 @@ class LanceTable(Table):
sample_rate=sample_rate,
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
)
elif index_type == "IVF_HNSW_SQ":
config = HnswSq(
@@ -2056,6 +2062,7 @@ class LanceTable(Table):
sample_rate=sample_rate,
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
)
else:
raise ValueError(f"Unknown index type {index_type}")

View File

@@ -747,15 +747,16 @@ def test_local_namespace_operations(tmp_path):
# Create a local database connection
db = lancedb.connect(tmp_path)
# Test list_namespaces returns empty list
# Test list_namespaces returns empty list for root namespace
namespaces = list(db.list_namespaces())
assert namespaces == []
# Test list_namespaces with parameters still returns empty list
namespaces_with_params = list(
db.list_namespaces(namespace=["test"], page_token="token", limit=5)
)
assert namespaces_with_params == []
# Test list_namespaces with non-empty namespace raises NotImplementedError
with pytest.raises(
NotImplementedError,
match="Namespace operations are not supported for listing database",
):
list(db.list_namespaces(namespace=["test"]))
def test_local_create_namespace_not_supported(tmp_path):
@@ -830,3 +831,119 @@ def test_local_table_operations_with_namespace_raise_error(tmp_path):
# Test table_names without namespace - should work normally
tables_root = list(db.table_names())
assert "test_table" in tables_root
def test_clone_table_latest_version(tmp_path):
"""Test cloning a table with the latest version (default behavior)"""
import os
db = lancedb.connect(tmp_path)
# Create source table with some data
data = [
{"id": 1, "text": "hello", "vector": [1.0, 2.0]},
{"id": 2, "text": "world", "vector": [3.0, 4.0]},
]
source_table = db.create_table("source", data=data)
# Add more data to create a new version
more_data = [{"id": 3, "text": "test", "vector": [5.0, 6.0]}]
source_table.add(more_data)
# Clone the table (should get latest version with 3 rows)
source_uri = os.path.join(tmp_path, "source.lance")
cloned_table = db.clone_table("cloned", source_uri)
# Verify cloned table has all 3 rows
assert cloned_table.count_rows() == 3
assert "cloned" in db.table_names()
# Verify data matches
cloned_data = cloned_table.to_pandas()
assert len(cloned_data) == 3
assert set(cloned_data["id"].tolist()) == {1, 2, 3}
def test_clone_table_specific_version(tmp_path):
"""Test cloning a table from a specific version"""
import os
db = lancedb.connect(tmp_path)
# Create source table with initial data
data = [
{"id": 1, "text": "hello", "vector": [1.0, 2.0]},
{"id": 2, "text": "world", "vector": [3.0, 4.0]},
]
source_table = db.create_table("source", data=data)
# Get the initial version
initial_version = source_table.version
# Add more data to create a new version
more_data = [{"id": 3, "text": "test", "vector": [5.0, 6.0]}]
source_table.add(more_data)
# Verify source now has 3 rows
assert source_table.count_rows() == 3
# Clone from the initial version (should have only 2 rows)
source_uri = os.path.join(tmp_path, "source.lance")
cloned_table = db.clone_table("cloned", source_uri, source_version=initial_version)
# Verify cloned table has only the initial 2 rows
assert cloned_table.count_rows() == 2
cloned_data = cloned_table.to_pandas()
assert set(cloned_data["id"].tolist()) == {1, 2}
def test_clone_table_with_tag(tmp_path):
"""Test cloning a table from a tagged version"""
import os
db = lancedb.connect(tmp_path)
# Create source table with initial data
data = [
{"id": 1, "text": "hello", "vector": [1.0, 2.0]},
{"id": 2, "text": "world", "vector": [3.0, 4.0]},
]
source_table = db.create_table("source", data=data)
# Create a tag for the current version
source_table.tags.create("v1.0", source_table.version)
# Add more data after the tag
more_data = [{"id": 3, "text": "test", "vector": [5.0, 6.0]}]
source_table.add(more_data)
# Verify source now has 3 rows
assert source_table.count_rows() == 3
# Clone from the tagged version (should have only 2 rows)
source_uri = os.path.join(tmp_path, "source.lance")
cloned_table = db.clone_table("cloned", source_uri, source_tag="v1.0")
# Verify cloned table has only the tagged version's 2 rows
assert cloned_table.count_rows() == 2
cloned_data = cloned_table.to_pandas()
assert set(cloned_data["id"].tolist()) == {1, 2}
def test_clone_table_deep_clone_fails(tmp_path):
"""Test that deep clone raises an unsupported error"""
import os
db = lancedb.connect(tmp_path)
# Create source table with some data
data = [
{"id": 1, "text": "hello", "vector": [1.0, 2.0]},
{"id": 2, "text": "world", "vector": [3.0, 4.0]},
]
db.create_table("source", data=data)
# Try to create a deep clone (should fail)
source_uri = os.path.join(tmp_path, "source.lance")
with pytest.raises(Exception, match="Deep clone is not yet implemented"):
db.clone_table("cloned", source_uri, is_shallow=False)

View File

@@ -114,6 +114,63 @@ def test_embedding_function_variables():
assert func.safe_model_dump()["secret_key"] == "$var:secret"
def test_parse_functions_with_variables():
@register("variable-parsing-test")
class VariableParsingFunction(TextEmbeddingFunction):
api_key: str
base_url: Optional[str] = None
@staticmethod
def sensitive_keys():
return ["api_key"]
def ndims(self):
return 10
def generate_embeddings(self, texts):
# Mock implementation that just returns random embeddings
# In real usage, this would use the api_key to call an API
return [np.random.rand(self.ndims()).tolist() for _ in texts]
registry = EmbeddingFunctionRegistry.get_instance()
registry.set_var("test_api_key", "sk-test-key-12345")
registry.set_var("test_base_url", "https://api.example.com")
conf = EmbeddingFunctionConfig(
source_column="text",
vector_column="vector",
function=registry.get("variable-parsing-test").create(
api_key="$var:test_api_key", base_url="$var:test_base_url"
),
)
metadata = registry.get_table_metadata([conf])
# Create a mock arrow table with the metadata
schema = pa.schema(
[pa.field("text", pa.string()), pa.field("vector", pa.list_(pa.float32(), 10))]
)
table = pa.table({"text": [], "vector": []}, schema=schema)
table = table.replace_schema_metadata(metadata)
ds = lance.write_dataset(table, "memory://")
configs = registry.parse_functions(ds.schema.metadata)
assert "vector" in configs
parsed_func = configs["vector"].function
assert parsed_func.api_key == "sk-test-key-12345"
assert parsed_func.base_url == "https://api.example.com"
embeddings = parsed_func.generate_embeddings(["test text"])
assert len(embeddings) == 1
assert len(embeddings[0]) == 10
assert parsed_func.safe_model_dump()["api_key"] == "$var:test_api_key"
def test_embedding_with_bad_results(tmp_path):
@register("null-embedding")
class NullEmbeddingFunction(TextEmbeddingFunction):

View File

@@ -674,6 +674,45 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
"vector", replace=True, config=expected_config, name=None, train=True
)
# Test with target_partition_size
table.create_index(
metric="l2",
num_sub_vectors=96,
vector_column_name="vector",
replace=True,
index_cache_size=256,
num_bits=4,
target_partition_size=8192,
)
expected_config = IvfPq(
distance_type="l2",
num_sub_vectors=96,
num_bits=4,
target_partition_size=8192,
)
mock_create_index.assert_called_with(
"vector", replace=True, config=expected_config, name=None, train=True
)
# target_partition_size has a default value,
# so `num_partitions` and `target_partition_size` are not required
table.create_index(
metric="l2",
num_sub_vectors=96,
vector_column_name="vector",
replace=True,
index_cache_size=256,
num_bits=4,
)
expected_config = IvfPq(
distance_type="l2",
num_sub_vectors=96,
num_bits=4,
)
mock_create_index.assert_called_with(
"vector", replace=True, config=expected_config, name=None, train=True
)
table.create_index(
vector_column_name="my_vector",
metric="dot",

View File

@@ -163,6 +163,34 @@ impl Connection {
})
}
#[pyo3(signature = (target_table_name, source_uri, target_namespace=vec![], source_version=None, source_tag=None, is_shallow=true))]
pub fn clone_table(
self_: PyRef<'_, Self>,
target_table_name: String,
source_uri: String,
target_namespace: Vec<String>,
source_version: Option<u64>,
source_tag: Option<String>,
is_shallow: bool,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
let mut builder = inner.clone_table(target_table_name, source_uri);
builder = builder.target_namespace(target_namespace);
if let Some(version) = source_version {
builder = builder.source_version(version);
}
if let Some(tag) = source_tag {
builder = builder.source_tag(tag);
}
builder = builder.is_shallow(is_shallow);
future_into_py(self_.py(), async move {
let table = builder.execute().await.infer_error()?;
Ok(Table::new(table))
})
}
#[pyo3(signature = (cur_name, new_name, cur_namespace=vec![], new_namespace=vec![]))]
pub fn rename_table(
self_: PyRef<'_, Self>,
@@ -255,7 +283,7 @@ impl Connection {
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None))]
#[allow(clippy::too_many_arguments)]
pub fn connect(
py: Python,
py: Python<'_>,
uri: String,
api_key: Option<String>,
region: Option<String>,

View File

@@ -63,6 +63,9 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
if let Some(num_partitions) = params.num_partitions {
ivf_flat_builder = ivf_flat_builder.num_partitions(num_partitions);
}
if let Some(target_partition_size) = params.target_partition_size {
ivf_flat_builder = ivf_flat_builder.target_partition_size(target_partition_size);
}
Ok(LanceDbIndex::IvfFlat(ivf_flat_builder))
},
"IvfPq" => {
@@ -76,6 +79,9 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
if let Some(num_partitions) = params.num_partitions {
ivf_pq_builder = ivf_pq_builder.num_partitions(num_partitions);
}
if let Some(target_partition_size) = params.target_partition_size {
ivf_pq_builder = ivf_pq_builder.target_partition_size(target_partition_size);
}
if let Some(num_sub_vectors) = params.num_sub_vectors {
ivf_pq_builder = ivf_pq_builder.num_sub_vectors(num_sub_vectors);
}
@@ -94,6 +100,9 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
if let Some(num_partitions) = params.num_partitions {
hnsw_pq_builder = hnsw_pq_builder.num_partitions(num_partitions);
}
if let Some(target_partition_size) = params.target_partition_size {
hnsw_pq_builder = hnsw_pq_builder.target_partition_size(target_partition_size);
}
if let Some(num_sub_vectors) = params.num_sub_vectors {
hnsw_pq_builder = hnsw_pq_builder.num_sub_vectors(num_sub_vectors);
}
@@ -111,6 +120,9 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
if let Some(num_partitions) = params.num_partitions {
hnsw_sq_builder = hnsw_sq_builder.num_partitions(num_partitions);
}
if let Some(target_partition_size) = params.target_partition_size {
hnsw_sq_builder = hnsw_sq_builder.target_partition_size(target_partition_size);
}
Ok(LanceDbIndex::IvfHnswSq(hnsw_sq_builder))
},
not_supported => Err(PyValueError::new_err(format!(
@@ -144,6 +156,7 @@ struct IvfFlatParams {
num_partitions: Option<u32>,
max_iterations: u32,
sample_rate: u32,
target_partition_size: Option<u32>,
}
#[derive(FromPyObject)]
@@ -154,6 +167,7 @@ struct IvfPqParams {
num_bits: u32,
max_iterations: u32,
sample_rate: u32,
target_partition_size: Option<u32>,
}
#[derive(FromPyObject)]
@@ -166,6 +180,7 @@ struct IvfHnswPqParams {
sample_rate: u32,
m: u32,
ef_construction: u32,
target_partition_size: Option<u32>,
}
#[derive(FromPyObject)]
@@ -176,6 +191,7 @@ struct IvfHnswSqParams {
sample_rate: u32,
m: u32,
ef_construction: u32,
target_partition_size: Option<u32>,
}
#[pyclass(get_all)]

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.22.1-beta.0"
version = "0.22.1-beta.2"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true
@@ -86,11 +86,11 @@ rand = { version = "0.9", features = ["small_rng"] }
random_word = { version = "0.4.3", features = ["en"] }
uuid = { version = "1.7.0", features = ["v4"] }
walkdir = "2"
aws-sdk-dynamodb = { version = "1.38.0" }
aws-sdk-s3 = { version = "1.38.0" }
aws-sdk-kms = { version = "1.37" }
aws-config = { version = "1.0" }
aws-smithy-runtime = { version = "1.3" }
aws-sdk-dynamodb = { version = "1.55.0" }
aws-sdk-s3 = { version = "1.55.0" }
aws-sdk-kms = { version = "1.48.0" }
aws-config = { version = "1.5.10" }
aws-smithy-runtime = { version = "1.9.1" }
datafusion.workspace = true
http-body = "1" # Matching reqwest
rstest = "0.23.0"

View File

@@ -1,86 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Catalog implementation for managing databases
pub mod listing;
use std::collections::HashMap;
use std::sync::Arc;
use crate::database::Database;
use crate::error::Result;
use async_trait::async_trait;
pub trait CatalogOptions {
fn serialize_into_map(&self, map: &mut HashMap<String, String>);
}
/// Request parameters for listing databases
#[derive(Clone, Debug, Default)]
pub struct DatabaseNamesRequest {
/// Start listing after this name (exclusive)
pub start_after: Option<String>,
/// Maximum number of names to return
pub limit: Option<u32>,
}
/// Request to open an existing database
#[derive(Clone, Debug)]
pub struct OpenDatabaseRequest {
/// The name of the database to open
pub name: String,
/// A map of database-specific options
///
/// Consult the catalog / database implementation to determine which options are available
pub database_options: HashMap<String, String>,
}
/// Database creation mode
///
/// The default behavior is Create
pub enum CreateDatabaseMode {
/// Create new database, error if exists
Create,
/// Open existing database if present
ExistOk,
/// Overwrite existing database
Overwrite,
}
impl Default for CreateDatabaseMode {
fn default() -> Self {
Self::Create
}
}
/// Request to create a new database
pub struct CreateDatabaseRequest {
/// The name of the database to create
pub name: String,
/// The creation mode
pub mode: CreateDatabaseMode,
/// A map of catalog-specific options, consult your catalog implementation to determine what's available
pub options: HashMap<String, String>,
}
#[async_trait]
pub trait Catalog: Send + Sync + std::fmt::Debug + 'static {
/// List database names with pagination
async fn database_names(&self, request: DatabaseNamesRequest) -> Result<Vec<String>>;
/// Create a new database
async fn create_database(&self, request: CreateDatabaseRequest) -> Result<Arc<dyn Database>>;
/// Open existing database
async fn open_database(&self, request: OpenDatabaseRequest) -> Result<Arc<dyn Database>>;
/// Rename database
async fn rename_database(&self, old_name: &str, new_name: &str) -> Result<()>;
/// Delete database
async fn drop_database(&self, name: &str) -> Result<()>;
/// Delete all databases
async fn drop_all_databases(&self) -> Result<()>;
}

View File

@@ -1,624 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Catalog implementation based on a local file system.
use std::collections::HashMap;
use std::fs::create_dir_all;
use std::path::Path;
use std::sync::Arc;
use super::{
Catalog, CatalogOptions, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest,
OpenDatabaseRequest,
};
use crate::connection::ConnectRequest;
use crate::database::listing::{ListingDatabase, ListingDatabaseOptions};
use crate::database::{Database, DatabaseOptions};
use crate::error::{CreateDirSnafu, Error, Result};
use async_trait::async_trait;
use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
use lance_io::local::to_local_path;
use object_store::path::Path as ObjectStorePath;
use snafu::ResultExt;
/// Options for the listing catalog
///
/// Note: the catalog will use the `storage_options` configured on
/// db_options to configure storage for listing / creating / deleting
/// databases.
#[derive(Clone, Debug, Default)]
pub struct ListingCatalogOptions {
/// The options to use for databases opened by this catalog
///
/// This also contains the storage options used by the catalog
pub db_options: ListingDatabaseOptions,
}
impl CatalogOptions for ListingCatalogOptions {
fn serialize_into_map(&self, map: &mut HashMap<String, String>) {
self.db_options.serialize_into_map(map);
}
}
impl ListingCatalogOptions {
pub fn builder() -> ListingCatalogOptionsBuilder {
ListingCatalogOptionsBuilder::new()
}
pub(crate) fn parse_from_map(map: &HashMap<String, String>) -> Result<Self> {
let db_options = ListingDatabaseOptions::parse_from_map(map)?;
Ok(Self { db_options })
}
}
#[derive(Clone, Debug, Default)]
pub struct ListingCatalogOptionsBuilder {
options: ListingCatalogOptions,
}
impl ListingCatalogOptionsBuilder {
pub fn new() -> Self {
Self {
options: ListingCatalogOptions::default(),
}
}
pub fn db_options(mut self, db_options: ListingDatabaseOptions) -> Self {
self.options.db_options = db_options;
self
}
pub fn build(self) -> ListingCatalogOptions {
self.options
}
}
/// A catalog implementation that works by listing subfolders in a directory
///
/// The listing catalog will be created with a base folder specified by the URI. Every subfolder
/// in this base folder will be considered a database. These will be opened as a
/// [`crate::database::listing::ListingDatabase`]
#[derive(Debug)]
pub struct ListingCatalog {
object_store: Arc<ObjectStore>,
uri: String,
base_path: ObjectStorePath,
options: ListingCatalogOptions,
}
impl ListingCatalog {
/// Try to create a local directory to store the lancedb dataset
pub fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> {
let path = Path::new(path);
if !path.try_exists()? {
create_dir_all(path)?;
}
Ok(())
}
pub fn uri(&self) -> &str {
&self.uri
}
async fn open_path(path: &str) -> Result<Self> {
let (object_store, base_path) = ObjectStore::from_uri(path).await?;
if object_store.is_local() {
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
}
Ok(Self {
uri: path.to_string(),
base_path,
object_store,
options: ListingCatalogOptions::default(),
})
}
pub async fn connect(request: &ConnectRequest) -> Result<Self> {
let uri = &request.uri;
let parse_res = url::Url::parse(uri);
let options = ListingCatalogOptions::parse_from_map(&request.options)?;
match parse_res {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => Self::open_path(uri).await,
Ok(url) => {
let plain_uri = url.to_string();
let registry = Arc::new(ObjectStoreRegistry::default());
let storage_options = options.db_options.storage_options.clone();
let os_params = ObjectStoreParams {
storage_options: Some(storage_options.clone()),
..Default::default()
};
let (object_store, base_path) =
ObjectStore::from_uri_and_params(registry, &plain_uri, &os_params).await?;
if object_store.is_local() {
Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?;
}
Ok(Self {
uri: String::from(url.clone()),
base_path,
object_store,
options,
})
}
Err(_) => Self::open_path(uri).await,
}
}
fn database_path(&self, name: &str) -> ObjectStorePath {
self.base_path.child(name.replace('\\', "/"))
}
}
#[async_trait]
impl Catalog for ListingCatalog {
async fn database_names(&self, request: DatabaseNamesRequest) -> Result<Vec<String>> {
let mut f = self
.object_store
.read_dir(self.base_path.clone())
.await?
.iter()
.map(Path::new)
.filter_map(|p| p.file_name().and_then(|s| s.to_str().map(String::from)))
.collect::<Vec<String>>();
f.sort();
if let Some(start_after) = request.start_after {
let index = f
.iter()
.position(|name| name.as_str() > start_after.as_str())
.unwrap_or(f.len());
f.drain(0..index);
}
if let Some(limit) = request.limit {
f.truncate(limit as usize);
}
Ok(f)
}
async fn create_database(&self, request: CreateDatabaseRequest) -> Result<Arc<dyn Database>> {
let db_path = self.database_path(&request.name);
let db_path_str = to_local_path(&db_path);
let exists = Path::new(&db_path_str).exists();
match request.mode {
CreateDatabaseMode::Create if exists => {
return Err(Error::DatabaseAlreadyExists { name: request.name })
}
CreateDatabaseMode::Create => {
create_dir_all(db_path.to_string()).unwrap();
}
CreateDatabaseMode::ExistOk => {
if !exists {
create_dir_all(db_path.to_string()).unwrap();
}
}
CreateDatabaseMode::Overwrite => {
if exists {
self.drop_database(&request.name).await?;
}
create_dir_all(db_path.to_string()).unwrap();
}
}
let db_uri = format!("/{}/{}", self.base_path, request.name);
let mut connect_request = ConnectRequest {
uri: db_uri,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
options: Default::default(),
session: None,
};
// Add the db options to the connect request
self.options
.db_options
.serialize_into_map(&mut connect_request.options);
Ok(Arc::new(
ListingDatabase::connect_with_options(&connect_request).await?,
))
}
async fn open_database(&self, request: OpenDatabaseRequest) -> Result<Arc<dyn Database>> {
let db_path = self.database_path(&request.name);
let db_path_str = to_local_path(&db_path);
let exists = Path::new(&db_path_str).exists();
if !exists {
return Err(Error::DatabaseNotFound { name: request.name });
}
let mut connect_request = ConnectRequest {
uri: db_path.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
options: Default::default(),
session: None,
};
// Add the db options to the connect request
self.options
.db_options
.serialize_into_map(&mut connect_request.options);
Ok(Arc::new(
ListingDatabase::connect_with_options(&connect_request).await?,
))
}
async fn rename_database(&self, _old_name: &str, _new_name: &str) -> Result<()> {
Err(Error::NotSupported {
message: "rename_database is not supported in LanceDB OSS yet".to_string(),
})
}
async fn drop_database(&self, name: &str) -> Result<()> {
let db_path = self.database_path(name);
self.object_store
.remove_dir_all(db_path.clone())
.await
.map_err(|err| match err {
lance::Error::NotFound { .. } => Error::DatabaseNotFound {
name: name.to_owned(),
},
_ => Error::from(err),
})?;
Ok(())
}
async fn drop_all_databases(&self) -> Result<()> {
self.object_store
.remove_dir_all(self.base_path.clone())
.await?;
Ok(())
}
}
#[cfg(all(test, not(windows)))]
mod tests {
use super::*;
/// file:/// URIs with drive letters do not work correctly on Windows
#[cfg(windows)]
fn path_to_uri(path: PathBuf) -> String {
path.to_str().unwrap().to_string()
}
#[cfg(not(windows))]
fn path_to_uri(path: PathBuf) -> String {
Url::from_file_path(path).unwrap().to_string()
}
async fn setup_catalog() -> (TempDir, ListingCatalog) {
let tempdir = tempfile::tempdir().unwrap();
let catalog_path = tempdir.path().join("catalog");
std::fs::create_dir_all(&catalog_path).unwrap();
let uri = path_to_uri(catalog_path);
let request = ConnectRequest {
uri: uri.clone(),
#[cfg(feature = "remote")]
client_config: Default::default(),
options: Default::default(),
read_consistency_interval: None,
session: None,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();
(tempdir, catalog)
}
use crate::database::{CreateTableData, CreateTableRequest, TableNamesRequest};
use crate::table::TableDefinition;
use arrow_schema::Field;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::{tempdir, TempDir};
use url::Url;
#[tokio::test]
async fn test_database_names() {
let (_tempdir, catalog) = setup_catalog().await;
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(names.is_empty());
}
#[tokio::test]
async fn test_create_database() {
let (_tempdir, catalog) = setup_catalog().await;
catalog
.create_database(CreateDatabaseRequest {
name: "db1".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert_eq!(names, vec!["db1"]);
}
#[tokio::test]
async fn test_create_database_exist_ok() {
let (_tempdir, catalog) = setup_catalog().await;
let db1 = catalog
.create_database(CreateDatabaseRequest {
name: "db_exist_ok".into(),
mode: CreateDatabaseMode::ExistOk,
options: HashMap::new(),
})
.await
.unwrap();
let dummy_schema = Arc::new(arrow_schema::Schema::new(Vec::<Field>::default()));
db1.create_table(CreateTableRequest {
name: "test_table".parse().unwrap(),
data: CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)),
mode: Default::default(),
write_options: Default::default(),
namespace: vec![],
})
.await
.unwrap();
let db2 = catalog
.create_database(CreateDatabaseRequest {
name: "db_exist_ok".into(),
mode: CreateDatabaseMode::ExistOk,
options: HashMap::new(),
})
.await
.unwrap();
let tables = db2.table_names(TableNamesRequest::default()).await.unwrap();
assert_eq!(tables, vec!["test_table".to_string()]);
}
#[tokio::test]
async fn test_create_database_overwrite() {
let (_tempdir, catalog) = setup_catalog().await;
let db = catalog
.create_database(CreateDatabaseRequest {
name: "db_overwrite".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
let dummy_schema = Arc::new(arrow_schema::Schema::new(Vec::<Field>::default()));
db.create_table(CreateTableRequest {
name: "old_table".parse().unwrap(),
data: CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)),
mode: Default::default(),
write_options: Default::default(),
namespace: vec![],
})
.await
.unwrap();
let tables = db.table_names(TableNamesRequest::default()).await.unwrap();
assert!(!tables.is_empty());
let new_db = catalog
.create_database(CreateDatabaseRequest {
name: "db_overwrite".into(),
mode: CreateDatabaseMode::Overwrite,
options: HashMap::new(),
})
.await
.unwrap();
let tables = new_db
.table_names(TableNamesRequest::default())
.await
.unwrap();
assert!(tables.is_empty());
}
#[tokio::test]
async fn test_create_database_overwrite_non_existing() {
let (_tempdir, catalog) = setup_catalog().await;
catalog
.create_database(CreateDatabaseRequest {
name: "new_db".into(),
mode: CreateDatabaseMode::Overwrite,
options: HashMap::new(),
})
.await
.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(names.contains(&"new_db".to_string()));
}
#[tokio::test]
async fn test_open_database() {
let (_tempdir, catalog) = setup_catalog().await;
// Test open non-existent
let result = catalog
.open_database(OpenDatabaseRequest {
name: "missing".into(),
database_options: HashMap::new(),
})
.await;
assert!(matches!(
result.unwrap_err(),
Error::DatabaseNotFound { name } if name == "missing"
));
// Create and open
catalog
.create_database(CreateDatabaseRequest {
name: "valid_db".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
let db = catalog
.open_database(OpenDatabaseRequest {
name: "valid_db".into(),
database_options: HashMap::new(),
})
.await
.unwrap();
assert_eq!(
db.table_names(TableNamesRequest::default()).await.unwrap(),
Vec::<String>::new()
);
}
#[tokio::test]
async fn test_drop_database() {
let (_tempdir, catalog) = setup_catalog().await;
// Create test database
catalog
.create_database(CreateDatabaseRequest {
name: "to_drop".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(!names.is_empty());
// Drop database
catalog.drop_database("to_drop").await.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(names.is_empty());
}
#[tokio::test]
async fn test_drop_all_databases() {
let (_tempdir, catalog) = setup_catalog().await;
catalog
.create_database(CreateDatabaseRequest {
name: "db1".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
catalog
.create_database(CreateDatabaseRequest {
name: "db2".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
catalog.drop_all_databases().await.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(names.is_empty());
}
#[tokio::test]
async fn test_rename_database_unsupported() {
let (_tempdir, catalog) = setup_catalog().await;
let result = catalog.rename_database("old", "new").await;
assert!(matches!(
result.unwrap_err(),
Error::NotSupported { message } if message.contains("rename_database")
));
}
#[tokio::test]
async fn test_connect_local_path() {
let tmp_dir = tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let request = ConnectRequest {
uri: path.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
options: Default::default(),
read_consistency_interval: None,
session: None,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();
assert!(catalog.object_store.is_local());
assert_eq!(catalog.uri, path);
}
#[tokio::test]
async fn test_connect_file_scheme() {
let tmp_dir = tempdir().unwrap();
let path = tmp_dir.path();
let uri = path_to_uri(path.to_path_buf());
let request = ConnectRequest {
uri: uri.clone(),
#[cfg(feature = "remote")]
client_config: Default::default(),
options: Default::default(),
read_consistency_interval: None,
session: None,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();
assert!(catalog.object_store.is_local());
assert_eq!(catalog.uri, uri);
}
#[tokio::test]
async fn test_connect_invalid_uri_fallback() {
let invalid_uri = "invalid:///path";
let request = ConnectRequest {
uri: invalid_uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
options: Default::default(),
read_consistency_interval: None,
session: None,
};
let result = ListingCatalog::connect(&request).await;
assert!(result.is_err());
}
}

View File

@@ -13,15 +13,13 @@ use lance::dataset::ReadParams;
use object_store::aws::AwsCredential;
use crate::arrow::{IntoArrow, IntoArrowStream, SendableRecordBatchStream};
use crate::catalog::listing::ListingCatalog;
use crate::catalog::CatalogOptions;
use crate::database::listing::{
ListingDatabase, OPT_NEW_TABLE_STORAGE_VERSION, OPT_NEW_TABLE_V2_MANIFEST_PATHS,
};
use crate::database::{
CreateNamespaceRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database,
DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest,
TableNamesRequest,
CloneTableRequest, CreateNamespaceRequest, CreateTableData, CreateTableMode,
CreateTableRequest, Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest,
OpenTableRequest, TableNamesRequest,
};
use crate::embeddings::{
EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry, MemoryRegistry, WithEmbeddings,
@@ -471,6 +469,62 @@ impl OpenTableBuilder {
}
}
/// Builder for cloning a table.
///
/// A shallow clone creates a new table that shares the underlying data files
/// with the source table but has its own independent manifest. Both the source
/// and cloned tables can evolve independently while initially sharing the same
/// data, deletion, and index files.
///
/// Use this builder to configure the clone operation before executing it.
pub struct CloneTableBuilder {
parent: Arc<dyn Database>,
request: CloneTableRequest,
}
impl CloneTableBuilder {
fn new(parent: Arc<dyn Database>, target_table_name: String, source_uri: String) -> Self {
Self {
parent,
request: CloneTableRequest::new(target_table_name, source_uri),
}
}
/// Set the source version to clone from
pub fn source_version(mut self, version: u64) -> Self {
self.request.source_version = Some(version);
self
}
/// Set the source tag to clone from
pub fn source_tag(mut self, tag: impl Into<String>) -> Self {
self.request.source_tag = Some(tag.into());
self
}
/// Set the target namespace for the cloned table
pub fn target_namespace(mut self, namespace: Vec<String>) -> Self {
self.request.target_namespace = namespace;
self
}
/// Set whether to perform a shallow clone (default: true)
///
/// When true, the cloned table shares data files with the source table.
/// When false, performs a deep clone (not yet implemented).
pub fn is_shallow(mut self, is_shallow: bool) -> Self {
self.request.is_shallow = is_shallow;
self
}
/// Execute the clone operation
pub async fn execute(self) -> Result<Table> {
Ok(Table::new(
self.parent.clone().clone_table(self.request).await?,
))
}
}
/// A connection to LanceDB
#[derive(Clone)]
pub struct Connection {
@@ -577,6 +631,30 @@ impl Connection {
)
}
/// Clone a table in the database
///
/// Creates a new table by cloning from an existing source table.
/// By default, this performs a shallow clone where the new table shares
/// the underlying data files with the source table.
///
/// # Parameters
/// - `target_table_name`: The name of the new table to create
/// - `source_uri`: The URI of the source table to clone from
///
/// # Returns
/// A [`CloneTableBuilder`] that can be used to configure the clone operation
pub fn clone_table(
&self,
target_table_name: impl Into<String>,
source_uri: impl Into<String>,
) -> CloneTableBuilder {
CloneTableBuilder::new(
self.internal.clone(),
target_table_name.into(),
source_uri.into(),
)
}
/// Rename a table in the database.
///
/// This is only supported in LanceDB Cloud.
@@ -660,7 +738,7 @@ pub struct ConnectRequest {
#[cfg(feature = "remote")]
pub client_config: ClientConfig,
/// Database/Catalog specific options
/// Database specific options
pub options: HashMap<String, String>,
/// The interval at which to check for updates from other processes.
@@ -937,50 +1015,6 @@ pub fn connect(uri: &str) -> ConnectBuilder {
ConnectBuilder::new(uri)
}
/// A builder for configuring a connection to a LanceDB catalog
#[derive(Debug)]
pub struct CatalogConnectBuilder {
request: ConnectRequest,
}
impl CatalogConnectBuilder {
/// Create a new [`CatalogConnectBuilder`] with the given catalog URI.
pub fn new(uri: &str) -> Self {
Self {
request: ConnectRequest {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
options: HashMap::new(),
session: None,
},
}
}
pub fn catalog_options(mut self, catalog_options: &dyn CatalogOptions) -> Self {
catalog_options.serialize_into_map(&mut self.request.options);
self
}
/// Establishes a connection to the catalog
pub async fn execute(self) -> Result<Arc<ListingCatalog>> {
let catalog = ListingCatalog::connect(&self.request).await?;
Ok(Arc::new(catalog))
}
}
/// Connect to a LanceDB catalog.
///
/// A catalog is a container for databases, which in turn are containers for tables.
///
/// # Arguments
///
/// * `uri` - URI where the catalog is located, can be a local directory or supported remote cloud storage.
pub fn connect_catalog(uri: &str) -> CatalogConnectBuilder {
CatalogConnectBuilder::new(uri)
}
#[cfg(all(test, feature = "remote"))]
mod test_utils {
use super::*;
@@ -1022,7 +1056,6 @@ mod test_utils {
mod tests {
use std::fs::create_dir_all;
use crate::catalog::{Catalog, DatabaseNamesRequest, OpenDatabaseRequest};
use crate::database::listing::{ListingDatabaseOptions, NewTableConfig};
use crate::query::QueryBase;
use crate::query::{ExecutableQuery, QueryExecutionOptions};
@@ -1330,89 +1363,48 @@ mod tests {
}
#[tokio::test]
async fn test_connect_catalog() {
async fn test_clone_table() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let catalog = connect_catalog(uri).execute().await.unwrap();
let db = connect(uri).execute().await.unwrap();
// Verify that we can get the uri from the catalog
let catalog_uri = catalog.uri();
assert_eq!(catalog_uri, uri);
// Create a source table with some data
let mut batch_gen = BatchGenerator::new()
.col(Box::new(IncrementingInt32::new().named("id")))
.col(Box::new(IncrementingInt32::new().named("value")));
let reader = batch_gen.batches(5, 100);
// Check that the catalog is initially empty
let dbs = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert_eq!(dbs.len(), 0);
}
#[tokio::test]
#[cfg(not(windows))]
async fn test_catalog_create_database() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let catalog = connect_catalog(uri).execute().await.unwrap();
let db_name = "test_db";
catalog
.create_database(crate::catalog::CreateDatabaseRequest {
name: db_name.to_string(),
mode: Default::default(),
options: Default::default(),
})
let source_table = db
.create_table("source_table", reader)
.execute()
.await
.unwrap();
let dbs = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert_eq!(dbs.len(), 1);
assert_eq!(dbs[0], db_name);
// Get the source table URI
let source_table_path = tmp_dir.path().join("source_table.lance");
let source_uri = source_table_path.to_str().unwrap();
let db = catalog
.open_database(OpenDatabaseRequest {
name: db_name.to_string(),
database_options: HashMap::new(),
})
// Clone the table
let cloned_table = db
.clone_table("cloned_table", source_uri)
.execute()
.await
.unwrap();
let tables = db.table_names(Default::default()).await.unwrap();
assert_eq!(tables.len(), 0);
}
// Verify the cloned table exists
let table_names = db.table_names().execute().await.unwrap();
assert!(table_names.contains(&"source_table".to_string()));
assert!(table_names.contains(&"cloned_table".to_string()));
#[tokio::test]
#[cfg(not(windows))]
async fn test_catalog_drop_database() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let catalog = connect_catalog(uri).execute().await.unwrap();
// Verify the cloned table has the same schema
assert_eq!(
source_table.schema().await.unwrap(),
cloned_table.schema().await.unwrap()
);
// Create and then drop a database
let db_name = "test_db_to_drop";
catalog
.create_database(crate::catalog::CreateDatabaseRequest {
name: db_name.to_string(),
mode: Default::default(),
options: Default::default(),
})
.await
.unwrap();
let dbs = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert_eq!(dbs.len(), 1);
catalog.drop_database(db_name).await.unwrap();
let dbs_after = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert_eq!(dbs_after.len(), 0);
// Verify the cloned table has the same data
let source_count = source_table.count_rows(None).await.unwrap();
let cloned_count = cloned_table.count_rows(None).await.unwrap();
assert_eq!(source_count, cloned_count);
}
}

View File

@@ -176,6 +176,42 @@ impl CreateTableRequest {
}
}
/// Request to clone a table from a source table.
///
/// A shallow clone creates a new table that shares the underlying data files
/// with the source table but has its own independent manifest. This allows
/// both the source and cloned tables to evolve independently while initially
/// sharing the same data, deletion, and index files.
#[derive(Clone, Debug)]
pub struct CloneTableRequest {
/// The name of the target table to create
pub target_table_name: String,
/// The namespace for the target table. Empty list represents root namespace.
pub target_namespace: Vec<String>,
/// The URI of the source table to clone from.
pub source_uri: String,
/// Optional version of the source table to clone.
pub source_version: Option<u64>,
/// Optional tag of the source table to clone.
pub source_tag: Option<String>,
/// Whether to perform a shallow clone (true) or deep clone (false). Defaults to true.
/// Currently only shallow clone is supported.
pub is_shallow: bool,
}
impl CloneTableRequest {
pub fn new(target_table_name: String, source_uri: String) -> Self {
Self {
target_table_name,
target_namespace: vec![],
source_uri,
source_version: None,
source_tag: None,
is_shallow: true,
}
}
}
/// The `Database` trait defines the interface for database implementations.
///
/// A database is responsible for managing tables and their metadata.
@@ -193,6 +229,13 @@ pub trait Database:
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>>;
/// Create a table in the database
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>>;
/// Clone a table in the database.
///
/// Creates a shallow clone of the source table, sharing underlying data files
/// but with an independent manifest. Both tables can evolve separately after cloning.
///
/// See [`CloneTableRequest`] for detailed documentation and examples.
async fn clone_table(&self, request: CloneTableRequest) -> Result<Arc<dyn BaseTable>>;
/// Open a table in the database
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>>;
/// Rename a table in the database

View File

@@ -7,7 +7,8 @@ use std::fs::create_dir_all;
use std::path::Path;
use std::{collections::HashMap, sync::Arc};
use lance::dataset::{ReadParams, WriteMode};
use lance::dataset::refs::Ref;
use lance::dataset::{builder::DatasetBuilder, ReadParams, WriteMode};
use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore};
use lance_datafusion::utils::StreamingWriteSource;
use lance_encoding::version::LanceFileVersion;
@@ -22,8 +23,8 @@ use crate::table::NativeTable;
use crate::utils::validate_table_name;
use super::{
BaseTable, CreateNamespaceRequest, CreateTableMode, CreateTableRequest, Database,
DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest,
BaseTable, CloneTableRequest, CreateNamespaceRequest, CreateTableMode, CreateTableRequest,
Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest,
TableNamesRequest,
};
@@ -587,7 +588,13 @@ impl ListingDatabase {
#[async_trait::async_trait]
impl Database for ListingDatabase {
async fn list_namespaces(&self, _request: ListNamespacesRequest) -> Result<Vec<String>> {
async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result<Vec<String>> {
if !request.namespace.is_empty() {
return Err(Error::NotSupported {
message: "Namespace operations are not supported for listing database".into(),
});
}
Ok(Vec::new())
}
@@ -678,6 +685,65 @@ impl Database for ListingDatabase {
}
}
async fn clone_table(&self, request: CloneTableRequest) -> Result<Arc<dyn BaseTable>> {
if !request.target_namespace.is_empty() {
return Err(Error::NotSupported {
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
});
}
// TODO: support deep clone
if !request.is_shallow {
return Err(Error::NotSupported {
message: "Deep clone is not yet implemented".to_string(),
});
}
validate_table_name(&request.target_table_name)?;
let storage_params = ObjectStoreParams {
storage_options: Some(self.storage_options.clone()),
..Default::default()
};
let read_params = ReadParams {
store_options: Some(storage_params.clone()),
session: Some(self.session.clone()),
..Default::default()
};
let mut source_dataset = DatasetBuilder::from_uri(&request.source_uri)
.with_read_params(read_params.clone())
.load()
.await
.map_err(|e| Error::Lance { source: e })?;
let version_ref = match (request.source_version, request.source_tag) {
(Some(v), None) => Ok(Ref::Version(v)),
(None, Some(tag)) => Ok(Ref::Tag(tag)),
(None, None) => Ok(Ref::Version(source_dataset.version().version)),
_ => Err(Error::InvalidInput {
message: "Cannot specify both source_version and source_tag".to_string(),
}),
}?;
let target_uri = self.table_uri(&request.target_table_name)?;
source_dataset
.shallow_clone(&target_uri, version_ref, storage_params)
.await
.map_err(|e| Error::Lance { source: e })?;
let cloned_table = NativeTable::open_with_params(
&target_uri,
&request.target_table_name,
self.store_wrapper.clone(),
None,
self.read_consistency_interval,
)
.await?;
Ok(Arc::new(cloned_table))
}
async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
if !request.namespace.is_empty() {
return Err(Error::NotSupported {
@@ -779,3 +845,694 @@ impl Database for ListingDatabase {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::connection::ConnectRequest;
use crate::database::{CreateTableData, CreateTableMode, CreateTableRequest};
use crate::table::{Table, TableDefinition};
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use tempfile::tempdir;
async fn setup_database() -> (tempfile::TempDir, ListingDatabase) {
let tempdir = tempdir().unwrap();
let uri = tempdir.path().to_str().unwrap();
let request = ConnectRequest {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
options: Default::default(),
read_consistency_interval: None,
session: None,
};
let db = ListingDatabase::connect_with_options(&request)
.await
.unwrap();
(tempdir, db)
}
#[tokio::test]
async fn test_clone_table_basic() {
let (_tempdir, db) = setup_database().await;
// Create a source table with schema
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let source_table = db
.create_table(CreateTableRequest {
name: "source_table".to_string(),
namespace: vec![],
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema.clone())),
mode: CreateTableMode::Create,
write_options: Default::default(),
})
.await
.unwrap();
// Get the source table URI
let source_uri = db.table_uri("source_table").unwrap();
// Clone the table
let cloned_table = db
.clone_table(CloneTableRequest {
target_table_name: "cloned_table".to_string(),
target_namespace: vec![],
source_uri: source_uri.clone(),
source_version: None,
source_tag: None,
is_shallow: true,
})
.await
.unwrap();
// Verify both tables exist
let table_names = db.table_names(TableNamesRequest::default()).await.unwrap();
assert!(table_names.contains(&"source_table".to_string()));
assert!(table_names.contains(&"cloned_table".to_string()));
// Verify schemas match
assert_eq!(
source_table.schema().await.unwrap(),
cloned_table.schema().await.unwrap()
);
}
#[tokio::test]
async fn test_clone_table_with_data() {
let (_tempdir, db) = setup_database().await;
// Create a source table with actual data
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
],
)
.unwrap();
let reader = Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch)],
schema.clone(),
));
let source_table = db
.create_table(CreateTableRequest {
name: "source_with_data".to_string(),
namespace: vec![],
data: CreateTableData::Data(reader),
mode: CreateTableMode::Create,
write_options: Default::default(),
})
.await
.unwrap();
let source_uri = db.table_uri("source_with_data").unwrap();
// Clone the table
let cloned_table = db
.clone_table(CloneTableRequest {
target_table_name: "cloned_with_data".to_string(),
target_namespace: vec![],
source_uri,
source_version: None,
source_tag: None,
is_shallow: true,
})
.await
.unwrap();
// Verify data counts match
let source_count = source_table.count_rows(None).await.unwrap();
let cloned_count = cloned_table.count_rows(None).await.unwrap();
assert_eq!(source_count, cloned_count);
assert_eq!(source_count, 3);
}
#[tokio::test]
async fn test_clone_table_with_storage_options() {
let tempdir = tempdir().unwrap();
let uri = tempdir.path().to_str().unwrap();
// Create database with storage options
let mut options = HashMap::new();
options.insert("test_option".to_string(), "test_value".to_string());
let request = ConnectRequest {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
options: options.clone(),
read_consistency_interval: None,
session: None,
};
let db = ListingDatabase::connect_with_options(&request)
.await
.unwrap();
// Create source table
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
db.create_table(CreateTableRequest {
name: "source".to_string(),
namespace: vec![],
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
mode: CreateTableMode::Create,
write_options: Default::default(),
})
.await
.unwrap();
let source_uri = db.table_uri("source").unwrap();
// Clone should work with storage options
let cloned = db
.clone_table(CloneTableRequest {
target_table_name: "cloned".to_string(),
target_namespace: vec![],
source_uri,
source_version: None,
source_tag: None,
is_shallow: true,
})
.await;
assert!(cloned.is_ok());
}
#[tokio::test]
async fn test_clone_table_deep_not_supported() {
let (_tempdir, db) = setup_database().await;
// Create a source table
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
db.create_table(CreateTableRequest {
name: "source".to_string(),
namespace: vec![],
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
mode: CreateTableMode::Create,
write_options: Default::default(),
})
.await
.unwrap();
let source_uri = db.table_uri("source").unwrap();
// Try deep clone (should fail)
let result = db
.clone_table(CloneTableRequest {
target_table_name: "cloned".to_string(),
target_namespace: vec![],
source_uri,
source_version: None,
source_tag: None,
is_shallow: false, // Request deep clone
})
.await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
Error::NotSupported { message } if message.contains("Deep clone")
));
}
#[tokio::test]
async fn test_clone_table_with_namespace_not_supported() {
let (_tempdir, db) = setup_database().await;
// Create a source table
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
db.create_table(CreateTableRequest {
name: "source".to_string(),
namespace: vec![],
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
mode: CreateTableMode::Create,
write_options: Default::default(),
})
.await
.unwrap();
let source_uri = db.table_uri("source").unwrap();
// Try clone with namespace (should fail for listing database)
let result = db
.clone_table(CloneTableRequest {
target_table_name: "cloned".to_string(),
target_namespace: vec!["namespace".to_string()], // Non-empty namespace
source_uri,
source_version: None,
source_tag: None,
is_shallow: true,
})
.await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
Error::NotSupported { message } if message.contains("Namespace parameter is not supported")
));
}
#[tokio::test]
async fn test_clone_table_invalid_target_name() {
let (_tempdir, db) = setup_database().await;
// Create a source table
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
db.create_table(CreateTableRequest {
name: "source".to_string(),
namespace: vec![],
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
mode: CreateTableMode::Create,
write_options: Default::default(),
})
.await
.unwrap();
let source_uri = db.table_uri("source").unwrap();
// Try clone with invalid target name
let result = db
.clone_table(CloneTableRequest {
target_table_name: "invalid/name".to_string(), // Invalid name with slash
target_namespace: vec![],
source_uri,
source_version: None,
source_tag: None,
is_shallow: true,
})
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_clone_table_source_not_found() {
let (_tempdir, db) = setup_database().await;
// Try to clone from non-existent source
let result = db
.clone_table(CloneTableRequest {
target_table_name: "cloned".to_string(),
target_namespace: vec![],
source_uri: "/nonexistent/table.lance".to_string(),
source_version: None,
source_tag: None,
is_shallow: true,
})
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_clone_table_with_version_and_tag_error() {
let (_tempdir, db) = setup_database().await;
// Create a source table
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
db.create_table(CreateTableRequest {
name: "source".to_string(),
namespace: vec![],
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
mode: CreateTableMode::Create,
write_options: Default::default(),
})
.await
.unwrap();
let source_uri = db.table_uri("source").unwrap();
// Try clone with both version and tag (should fail)
let result = db
.clone_table(CloneTableRequest {
target_table_name: "cloned".to_string(),
target_namespace: vec![],
source_uri,
source_version: Some(1),
source_tag: Some("v1.0".to_string()),
is_shallow: true,
})
.await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
Error::InvalidInput { message } if message.contains("Cannot specify both source_version and source_tag")
));
}
#[tokio::test]
async fn test_clone_table_with_specific_version() {
let (_tempdir, db) = setup_database().await;
// Create a source table with initial data
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Utf8, false),
]));
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let reader = Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch1)],
schema.clone(),
));
let source_table = db
.create_table(CreateTableRequest {
name: "versioned_source".to_string(),
namespace: vec![],
data: CreateTableData::Data(reader),
mode: CreateTableMode::Create,
write_options: Default::default(),
})
.await
.unwrap();
// Get the initial version
let initial_version = source_table.version().await.unwrap();
// Add more data to create a new version
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec!["c", "d"])),
],
)
.unwrap();
let source_table_obj = Table::new(source_table.clone());
source_table_obj
.add(Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch2)],
schema.clone(),
)))
.execute()
.await
.unwrap();
// Verify source table now has 4 rows
assert_eq!(source_table.count_rows(None).await.unwrap(), 4);
let source_uri = db.table_uri("versioned_source").unwrap();
// Clone from the initial version (should have only 2 rows)
let cloned_table = db
.clone_table(CloneTableRequest {
target_table_name: "cloned_from_version".to_string(),
target_namespace: vec![],
source_uri,
source_version: Some(initial_version),
source_tag: None,
is_shallow: true,
})
.await
.unwrap();
// Verify cloned table has only the initial 2 rows
assert_eq!(cloned_table.count_rows(None).await.unwrap(), 2);
// Source table should still have 4 rows
assert_eq!(source_table.count_rows(None).await.unwrap(), 4);
}
#[tokio::test]
async fn test_clone_table_with_tag() {
let (_tempdir, db) = setup_database().await;
// Create a source table with initial data
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Utf8, false),
]));
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let reader = Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch1)],
schema.clone(),
));
let source_table = db
.create_table(CreateTableRequest {
name: "tagged_source".to_string(),
namespace: vec![],
data: CreateTableData::Data(reader),
mode: CreateTableMode::Create,
write_options: Default::default(),
})
.await
.unwrap();
// Create a tag for the current version
let source_table_obj = Table::new(source_table.clone());
let mut tags = source_table_obj.tags().await.unwrap();
tags.create("v1.0", source_table.version().await.unwrap())
.await
.unwrap();
// Add more data after the tag
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec!["c", "d"])),
],
)
.unwrap();
let source_table_obj = Table::new(source_table.clone());
source_table_obj
.add(Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch2)],
schema.clone(),
)))
.execute()
.await
.unwrap();
// Source table should have 4 rows
assert_eq!(source_table.count_rows(None).await.unwrap(), 4);
let source_uri = db.table_uri("tagged_source").unwrap();
// Clone from the tag (should have only 2 rows)
let cloned_table = db
.clone_table(CloneTableRequest {
target_table_name: "cloned_from_tag".to_string(),
target_namespace: vec![],
source_uri,
source_version: None,
source_tag: Some("v1.0".to_string()),
is_shallow: true,
})
.await
.unwrap();
// Verify cloned table has only the tagged version's 2 rows
assert_eq!(cloned_table.count_rows(None).await.unwrap(), 2);
}
#[tokio::test]
async fn test_cloned_tables_evolve_independently() {
let (_tempdir, db) = setup_database().await;
// Create a source table with initial data
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Utf8, false),
]));
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let reader = Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch1)],
schema.clone(),
));
let source_table = db
.create_table(CreateTableRequest {
name: "independent_source".to_string(),
namespace: vec![],
data: CreateTableData::Data(reader),
mode: CreateTableMode::Create,
write_options: Default::default(),
})
.await
.unwrap();
let source_uri = db.table_uri("independent_source").unwrap();
// Clone the table
let cloned_table = db
.clone_table(CloneTableRequest {
target_table_name: "independent_clone".to_string(),
target_namespace: vec![],
source_uri,
source_version: None,
source_tag: None,
is_shallow: true,
})
.await
.unwrap();
// Both should start with 2 rows
assert_eq!(source_table.count_rows(None).await.unwrap(), 2);
assert_eq!(cloned_table.count_rows(None).await.unwrap(), 2);
// Add data to the cloned table
let batch_clone = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![3, 4, 5])),
Arc::new(StringArray::from(vec!["c", "d", "e"])),
],
)
.unwrap();
let cloned_table_obj = Table::new(cloned_table.clone());
cloned_table_obj
.add(Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch_clone)],
schema.clone(),
)))
.execute()
.await
.unwrap();
// Add different data to the source table
let batch_source = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![10, 11])),
Arc::new(StringArray::from(vec!["x", "y"])),
],
)
.unwrap();
let source_table_obj = Table::new(source_table.clone());
source_table_obj
.add(Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch_source)],
schema.clone(),
)))
.execute()
.await
.unwrap();
// Verify they have evolved independently
assert_eq!(source_table.count_rows(None).await.unwrap(), 4); // 2 + 2
assert_eq!(cloned_table.count_rows(None).await.unwrap(), 5); // 2 + 3
}
#[tokio::test]
async fn test_clone_latest_version() {
let (_tempdir, db) = setup_database().await;
// Create a source table with initial data
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let batch1 =
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1, 2]))])
.unwrap();
let reader = Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch1)],
schema.clone(),
));
let source_table = db
.create_table(CreateTableRequest {
name: "latest_version_source".to_string(),
namespace: vec![],
data: CreateTableData::Data(reader),
mode: CreateTableMode::Create,
write_options: Default::default(),
})
.await
.unwrap();
// Add more data to create new versions
for i in 0..3 {
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![i * 10, i * 10 + 1]))],
)
.unwrap();
let source_table_obj = Table::new(source_table.clone());
source_table_obj
.add(Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch)],
schema.clone(),
)))
.execute()
.await
.unwrap();
}
// Source should have 8 rows total (2 + 2 + 2 + 2)
let source_count = source_table.count_rows(None).await.unwrap();
assert_eq!(source_count, 8);
let source_uri = db.table_uri("latest_version_source").unwrap();
// Clone without specifying version or tag (should get latest)
let cloned_table = db
.clone_table(CloneTableRequest {
target_table_name: "cloned_latest".to_string(),
target_namespace: vec![],
source_uri,
source_version: None,
source_tag: None,
is_shallow: true,
})
.await
.unwrap();
// Cloned table should have all 8 rows from the latest version
assert_eq!(cloned_table.count_rows(None).await.unwrap(), 8);
}
}

View File

@@ -45,10 +45,10 @@ use crate::{
pub trait EmbeddingFunction: std::fmt::Debug + Send + Sync {
fn name(&self) -> &str;
/// The type of the input data
fn source_type(&self) -> Result<Cow<DataType>>;
fn source_type(&self) -> Result<Cow<'_, DataType>>;
/// The type of the output data
/// This should **always** match the output of the `embed` function
fn dest_type(&self) -> Result<Cow<DataType>>;
fn dest_type(&self) -> Result<Cow<'_, DataType>>;
/// Compute the embeddings for the source column in the database
fn compute_source_embeddings(&self, source: Arc<dyn Array>) -> Result<Arc<dyn Array>>;
/// Compute the embeddings for a given user query

View File

@@ -75,11 +75,11 @@ impl EmbeddingFunction for BedrockEmbeddingFunction {
"bedrock"
}
fn source_type(&self) -> Result<Cow<DataType>> {
fn source_type(&self) -> Result<Cow<'_, DataType>> {
Ok(Cow::Owned(DataType::Utf8))
}
fn dest_type(&self) -> Result<Cow<DataType>> {
fn dest_type(&self) -> Result<Cow<'_, DataType>> {
let n_dims = self.model.ndims();
Ok(Cow::Owned(DataType::new_fixed_size_list(
DataType::Float32,

View File

@@ -144,11 +144,11 @@ impl EmbeddingFunction for OpenAIEmbeddingFunction {
"openai"
}
fn source_type(&self) -> Result<Cow<DataType>> {
fn source_type(&self) -> Result<Cow<'_, DataType>> {
Ok(Cow::Owned(DataType::Utf8))
}
fn dest_type(&self) -> Result<Cow<DataType>> {
fn dest_type(&self) -> Result<Cow<'_, DataType>> {
let n_dims = self.model.ndims();
Ok(Cow::Owned(DataType::new_fixed_size_list(
DataType::Float32,

View File

@@ -407,11 +407,11 @@ impl EmbeddingFunction for SentenceTransformersEmbeddings {
"sentence-transformers"
}
fn source_type(&self) -> crate::Result<std::borrow::Cow<arrow_schema::DataType>> {
fn source_type(&self) -> crate::Result<std::borrow::Cow<'_, arrow_schema::DataType>> {
Ok(Cow::Owned(DataType::Utf8))
}
fn dest_type(&self) -> crate::Result<std::borrow::Cow<arrow_schema::DataType>> {
fn dest_type(&self) -> crate::Result<std::borrow::Cow<'_, arrow_schema::DataType>> {
let (n_dims, dtype) = self.compute_ndims_and_dtype()?;
Ok(Cow::Owned(DataType::new_fixed_size_list(
dtype,

View File

@@ -112,6 +112,15 @@ macro_rules! impl_ivf_params_setter {
self.max_iterations = max_iterations;
self
}
/// The target size of each partition.
///
/// This value controls the tradeoff between search performance and accuracy.
/// The higher the value the faster the search but the less accurate the results will be.
pub fn target_partition_size(mut self, target_partition_size: u32) -> Self {
self.target_partition_size = Some(target_partition_size);
self
}
};
}
@@ -182,6 +191,7 @@ pub struct IvfFlatIndexBuilder {
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
pub(crate) target_partition_size: Option<u32>,
}
impl Default for IvfFlatIndexBuilder {
@@ -191,6 +201,7 @@ impl Default for IvfFlatIndexBuilder {
num_partitions: None,
sample_rate: 256,
max_iterations: 50,
target_partition_size: None,
}
}
}
@@ -228,6 +239,7 @@ pub struct IvfPqIndexBuilder {
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
pub(crate) target_partition_size: Option<u32>,
// PQ
pub(crate) num_sub_vectors: Option<u32>,
@@ -243,6 +255,7 @@ impl Default for IvfPqIndexBuilder {
num_bits: None,
sample_rate: 256,
max_iterations: 50,
target_partition_size: None,
}
}
}
@@ -293,6 +306,7 @@ pub struct IvfHnswPqIndexBuilder {
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
pub(crate) target_partition_size: Option<u32>,
// HNSW
pub(crate) m: u32,
@@ -314,6 +328,7 @@ impl Default for IvfHnswPqIndexBuilder {
max_iterations: 50,
m: 20,
ef_construction: 300,
target_partition_size: None,
}
}
}
@@ -341,6 +356,7 @@ pub struct IvfHnswSqIndexBuilder {
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
pub(crate) target_partition_size: Option<u32>,
// HNSW
pub(crate) m: u32,
@@ -358,6 +374,7 @@ impl Default for IvfHnswSqIndexBuilder {
max_iterations: 50,
m: 20,
ef_construction: 300,
target_partition_size: None,
}
}
}

View File

@@ -191,7 +191,6 @@
//! ```
pub mod arrow;
pub mod catalog;
pub mod connection;
pub mod data;
pub mod database;

View File

@@ -14,9 +14,9 @@ use serde::Deserialize;
use tokio::task::spawn_blocking;
use crate::database::{
CreateNamespaceRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database,
DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest,
TableNamesRequest,
CloneTableRequest, CreateNamespaceRequest, CreateTableData, CreateTableMode,
CreateTableRequest, Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest,
OpenTableRequest, TableNamesRequest,
};
use crate::error::Result;
use crate::table::BaseTable;
@@ -27,6 +27,18 @@ use super::table::RemoteTable;
use super::util::{batches_to_ipc_bytes, parse_server_version};
use super::ARROW_STREAM_CONTENT_TYPE;
// Request structure for the remote clone table API
#[derive(serde::Serialize)]
struct RemoteCloneTableRequest {
source_location: String,
#[serde(skip_serializing_if = "Option::is_none")]
source_version: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
source_tag: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
is_shallow: Option<bool>,
}
// the versions of the server that we support
// for any new feature that we need to change the SDK behavior, we should bump the server version,
// and add a feature flag as method of `ServerVersion` here.
@@ -430,6 +442,51 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
Ok(table)
}
async fn clone_table(&self, request: CloneTableRequest) -> Result<Arc<dyn BaseTable>> {
let table_identifier = build_table_identifier(
&request.target_table_name,
&request.target_namespace,
&self.client.id_delimiter,
);
let remote_request = RemoteCloneTableRequest {
source_location: request.source_uri,
source_version: request.source_version,
source_tag: request.source_tag,
is_shallow: Some(request.is_shallow),
};
let req = self
.client
.post(&format!("/v1/table/{}/clone", table_identifier.clone()))
.json(&remote_request);
let (request_id, rsp) = self.client.send(req).await?;
let status = rsp.status();
if status != StatusCode::OK {
let body = rsp.text().await.err_to_http(request_id.clone())?;
return Err(crate::Error::Http {
source: format!("Failed to clone table: {}", body).into(),
request_id,
status_code: Some(status),
});
}
let version = parse_server_version(&request_id, &rsp)?;
let cache_key = build_cache_key(&request.target_table_name, &request.target_namespace);
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.target_table_name.clone(),
request.target_namespace.clone(),
table_identifier,
version,
));
self.table_cache.insert(cache_key, table.clone()).await;
Ok(table)
}
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
let identifier =
build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter);
@@ -1221,4 +1278,146 @@ mod tests {
_ => panic!("Expected Runtime error from header provider"),
}
}
#[tokio::test]
async fn test_clone_table() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/cloned_table/clone");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["source_location"], "s3://bucket/source_table");
assert_eq!(body["is_shallow"], true);
http::Response::builder().status(200).body("").unwrap()
});
let table = conn
.clone_table("cloned_table", "s3://bucket/source_table")
.execute()
.await
.unwrap();
assert_eq!(table.name(), "cloned_table");
}
#[tokio::test]
async fn test_clone_table_with_version() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/cloned_table/clone");
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["source_location"], "s3://bucket/source_table");
assert_eq!(body["source_version"], 42);
assert_eq!(body["is_shallow"], true);
http::Response::builder().status(200).body("").unwrap()
});
let table = conn
.clone_table("cloned_table", "s3://bucket/source_table")
.source_version(42)
.execute()
.await
.unwrap();
assert_eq!(table.name(), "cloned_table");
}
#[tokio::test]
async fn test_clone_table_with_tag() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/cloned_table/clone");
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["source_location"], "s3://bucket/source_table");
assert_eq!(body["source_tag"], "v1.0");
assert_eq!(body["is_shallow"], true);
http::Response::builder().status(200).body("").unwrap()
});
let table = conn
.clone_table("cloned_table", "s3://bucket/source_table")
.source_tag("v1.0")
.execute()
.await
.unwrap();
assert_eq!(table.name(), "cloned_table");
}
#[tokio::test]
async fn test_clone_table_deep_clone() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/cloned_table/clone");
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["source_location"], "s3://bucket/source_table");
assert_eq!(body["is_shallow"], false);
http::Response::builder().status(200).body("").unwrap()
});
let table = conn
.clone_table("cloned_table", "s3://bucket/source_table")
.is_shallow(false)
.execute()
.await
.unwrap();
assert_eq!(table.name(), "cloned_table");
}
#[tokio::test]
async fn test_clone_table_with_namespace() {
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/table/ns1$ns2$cloned_table/clone");
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["source_location"], "s3://bucket/source_table");
assert_eq!(body["is_shallow"], true);
http::Response::builder().status(200).body("").unwrap()
});
let table = conn
.clone_table("cloned_table", "s3://bucket/source_table")
.target_namespace(vec!["ns1".to_string(), "ns2".to_string()])
.execute()
.await
.unwrap();
assert_eq!(table.name(), "cloned_table");
}
#[tokio::test]
async fn test_clone_table_error() {
let conn = Connection::new_with_handler(|_| {
http::Response::builder()
.status(500)
.body("Internal server error")
.unwrap()
});
let result = conn
.clone_table("cloned_table", "s3://bucket/source_table")
.execute()
.await;
assert!(result.is_err());
if let Err(crate::Error::Http { source, .. }) = result {
assert!(source.to_string().contains("Failed to clone table"));
} else {
panic!("Expected HTTP error");
}
}
}

View File

@@ -242,17 +242,15 @@ pub struct OptimizeStats {
/// Describes what happens when a vector either contains NaN or
/// does not have enough values
#[derive(Clone, Debug, Default)]
#[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992
enum BadVectorHandling {
/// An error is returned
#[default]
Error,
#[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992
/// The offending row is droppped
Drop,
#[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992
/// The invalid/missing items are replaced by fill_value
Fill(f32),
#[allow(dead_code)] // https://github.com/lancedb/lancedb/issues/992
/// The invalid items are replaced by NULL
None,
}

View File

@@ -20,6 +20,8 @@ use datafusion_physical_plan::SendableRecordBatchStream;
lazy_static! {
static ref TABLE_NAME_REGEX: regex::Regex = regex::Regex::new(r"^[a-zA-Z0-9_\-\.]+$").unwrap();
static ref NAMESPACE_NAME_REGEX: regex::Regex =
regex::Regex::new(r"^[a-zA-Z0-9_\-\.]+$").unwrap();
}
pub trait PatchStoreParam {
@@ -98,6 +100,53 @@ pub fn validate_table_name(name: &str) -> Result<()> {
Ok(())
}
/// Validate a namespace name component
///
/// Namespace names must:
/// - Not be empty
/// - Only contain alphanumeric characters, underscores, hyphens, and periods
///
/// # Arguments
/// * `name` - A single namespace component (not the full path)
///
/// # Returns
/// * `Ok(())` if the namespace name is valid
/// * `Err(Error)` if the namespace name is invalid
pub fn validate_namespace_name(name: &str) -> Result<()> {
if name.is_empty() {
return Err(Error::InvalidInput {
message: "Namespace names cannot be empty strings".to_string(),
});
}
if !NAMESPACE_NAME_REGEX.is_match(name) {
return Err(Error::InvalidInput {
message: format!(
"Invalid namespace name '{}': Namespace names can only contain alphanumeric characters, underscores, hyphens, and periods",
name
),
});
}
Ok(())
}
/// Validate all components of a namespace
///
/// Iterates through all namespace components and validates each one.
/// Returns an error if any component is invalid.
///
/// # Arguments
/// * `namespace` - The namespace components to validate
///
/// # Returns
/// * `Ok(())` if all namespace components are valid
/// * `Err(Error)` if any component is invalid
pub fn validate_namespace(namespace: &[String]) -> Result<()> {
for component in namespace {
validate_namespace_name(component)?;
}
Ok(())
}
/// Find one default column to create index or perform vector query.
pub(crate) fn default_vector_column(schema: &Schema, dim: Option<i32>) -> Result<String> {
// Try to find a vector column.
@@ -345,6 +394,61 @@ mod tests {
assert!(validate_table_name("name with space").is_err());
}
#[test]
fn test_validate_namespace_name() {
// Valid namespace names
assert!(validate_namespace_name("ns1").is_ok());
assert!(validate_namespace_name("namespace_123").is_ok());
assert!(validate_namespace_name("my-namespace").is_ok());
assert!(validate_namespace_name("my.namespace").is_ok());
assert!(validate_namespace_name("NS_1.2.3").is_ok());
assert!(validate_namespace_name("a").is_ok());
assert!(validate_namespace_name("123").is_ok());
assert!(validate_namespace_name("_underscore").is_ok());
assert!(validate_namespace_name("-hyphen").is_ok());
assert!(validate_namespace_name(".period").is_ok());
// Invalid namespace names
assert!(validate_namespace_name("").is_err());
assert!(validate_namespace_name("namespace with spaces").is_err());
assert!(validate_namespace_name("namespace/with/slashes").is_err());
assert!(validate_namespace_name("namespace\\with\\backslashes").is_err());
assert!(validate_namespace_name("namespace$with$delimiter").is_err());
assert!(validate_namespace_name("namespace@special").is_err());
assert!(validate_namespace_name("namespace#hash").is_err());
}
#[test]
fn test_validate_namespace() {
// Valid namespace with single component
assert!(validate_namespace(&["ns1".to_string()]).is_ok());
// Valid namespace with multiple components
assert!(
validate_namespace(&["ns1".to_string(), "ns2".to_string(), "ns3".to_string()]).is_ok()
);
// Empty namespace (root) is valid
assert!(validate_namespace(&[]).is_ok());
// Invalid: contains empty component
assert!(validate_namespace(&["ns1".to_string(), "".to_string()]).is_err());
// Invalid: contains component with spaces
assert!(validate_namespace(&["ns1".to_string(), "ns 2".to_string()]).is_err());
// Invalid: contains component with special characters
assert!(validate_namespace(&["ns1".to_string(), "ns@2".to_string()]).is_err());
assert!(validate_namespace(&["ns1".to_string(), "ns/2".to_string()]).is_err());
assert!(validate_namespace(&["ns1".to_string(), "ns$2".to_string()]).is_err());
// Valid: underscores, hyphens, and periods are allowed
assert!(
validate_namespace(&["ns_1".to_string(), "ns-2".to_string(), "ns.3".to_string()])
.is_ok()
);
}
#[test]
fn test_string_to_datatype() {
let string = "int32";

View File

@@ -341,10 +341,10 @@ impl EmbeddingFunction for MockEmbed {
fn name(&self) -> &str {
&self.name
}
fn source_type(&self) -> Result<Cow<DataType>> {
fn source_type(&self) -> Result<Cow<'_, DataType>> {
Ok(Cow::Borrowed(&self.source_type))
}
fn dest_type(&self) -> Result<Cow<DataType>> {
fn dest_type(&self) -> Result<Cow<'_, DataType>> {
Ok(Cow::Borrowed(&self.dest_type))
}
fn compute_source_embeddings(&self, source: Arc<dyn Array>) -> Result<Arc<dyn Array>> {