mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-25 14:29:56 +00:00
Compare commits
7 Commits
python-v0.
...
v0.22.1-be
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
69cf220bc4 | ||
|
|
ebbeeff4e0 | ||
|
|
407ca53f92 | ||
|
|
ff71d7e552 | ||
|
|
2261eb95a0 | ||
|
|
5b397e410b | ||
|
|
b5a39bffec |
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.22.1-beta.1"
|
||||
current_version = "0.22.1-beta.3"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
4
.github/workflows/pypi-publish.yml
vendored
4
.github/workflows/pypi-publish.yml
vendored
@@ -56,7 +56,7 @@ jobs:
|
||||
pypi_token: ${{ secrets.LANCEDB_PYPI_API_TOKEN }}
|
||||
fury_token: ${{ secrets.FURY_TOKEN }}
|
||||
mac:
|
||||
timeout-minutes: 60
|
||||
timeout-minutes: 90
|
||||
runs-on: ${{ matrix.config.runner }}
|
||||
strategy:
|
||||
matrix:
|
||||
@@ -64,7 +64,7 @@ jobs:
|
||||
- target: x86_64-apple-darwin
|
||||
runner: macos-13
|
||||
- target: aarch64-apple-darwin
|
||||
runner: macos-14
|
||||
runner: warp-macos-14-arm64-6x
|
||||
env:
|
||||
MACOSX_DEPLOYMENT_TARGET: 10.15
|
||||
steps:
|
||||
|
||||
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -4628,7 +4628,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb"
|
||||
version = "0.22.1-beta.1"
|
||||
version = "0.22.1-beta.2"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4715,7 +4715,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-nodejs"
|
||||
version = "0.22.1-beta.1"
|
||||
version = "0.22.1-beta.2"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-ipc",
|
||||
@@ -4735,7 +4735,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-python"
|
||||
version = "0.25.1-beta.1"
|
||||
version = "0.25.1-beta.2"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
<parent>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.22.1-beta.1</version>
|
||||
<version>0.22.1-beta.3</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
<parent>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.22.1-beta.1</version>
|
||||
<version>0.22.1-beta.3</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.22.1-beta.1</version>
|
||||
<version>0.22.1-beta.3</version>
|
||||
<packaging>pom</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
<description>LanceDB Java SDK Parent POM</description>
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lancedb-nodejs"
|
||||
edition.workspace = true
|
||||
version = "0.22.1-beta.1"
|
||||
version = "0.22.1-beta.3"
|
||||
license.workspace = true
|
||||
description.workspace = true
|
||||
repository.workspace = true
|
||||
|
||||
@@ -203,3 +203,106 @@ describe("given a connection", () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("clone table functionality", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
let db: Connection;
|
||||
beforeEach(async () => {
|
||||
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||
db = await connect(tmpDir.name);
|
||||
});
|
||||
afterEach(() => tmpDir.removeCallback());
|
||||
|
||||
it("should clone a table with latest version (default behavior)", async () => {
|
||||
// Create source table with some data
|
||||
const data = [
|
||||
{ id: 1, text: "hello", vector: [1.0, 2.0] },
|
||||
{ id: 2, text: "world", vector: [3.0, 4.0] },
|
||||
];
|
||||
const sourceTable = await db.createTable("source", data);
|
||||
|
||||
// Add more data to create a new version
|
||||
const moreData = [{ id: 3, text: "test", vector: [5.0, 6.0] }];
|
||||
await sourceTable.add(moreData);
|
||||
|
||||
// Clone the table (should get latest version with 3 rows)
|
||||
const sourceUri = `${tmpDir.name}/source.lance`;
|
||||
const clonedTable = await db.cloneTable("cloned", sourceUri);
|
||||
|
||||
// Verify cloned table has all 3 rows
|
||||
expect(await clonedTable.countRows()).toBe(3);
|
||||
expect((await db.tableNames()).includes("cloned")).toBe(true);
|
||||
});
|
||||
|
||||
it("should clone a table from a specific version", async () => {
|
||||
// Create source table with initial data
|
||||
const data = [
|
||||
{ id: 1, text: "hello", vector: [1.0, 2.0] },
|
||||
{ id: 2, text: "world", vector: [3.0, 4.0] },
|
||||
];
|
||||
const sourceTable = await db.createTable("source", data);
|
||||
|
||||
// Get the initial version
|
||||
const initialVersion = await sourceTable.version();
|
||||
|
||||
// Add more data to create a new version
|
||||
const moreData = [{ id: 3, text: "test", vector: [5.0, 6.0] }];
|
||||
await sourceTable.add(moreData);
|
||||
|
||||
// Verify source now has 3 rows
|
||||
expect(await sourceTable.countRows()).toBe(3);
|
||||
|
||||
// Clone from the initial version (should have only 2 rows)
|
||||
const sourceUri = `${tmpDir.name}/source.lance`;
|
||||
const clonedTable = await db.cloneTable("cloned", sourceUri, {
|
||||
sourceVersion: initialVersion,
|
||||
});
|
||||
|
||||
// Verify cloned table has only the initial 2 rows
|
||||
expect(await clonedTable.countRows()).toBe(2);
|
||||
});
|
||||
|
||||
it("should clone a table from a tagged version", async () => {
|
||||
// Create source table with initial data
|
||||
const data = [
|
||||
{ id: 1, text: "hello", vector: [1.0, 2.0] },
|
||||
{ id: 2, text: "world", vector: [3.0, 4.0] },
|
||||
];
|
||||
const sourceTable = await db.createTable("source", data);
|
||||
|
||||
// Create a tag for the current version
|
||||
const tags = await sourceTable.tags();
|
||||
await tags.create("v1.0", await sourceTable.version());
|
||||
|
||||
// Add more data after the tag
|
||||
const moreData = [{ id: 3, text: "test", vector: [5.0, 6.0] }];
|
||||
await sourceTable.add(moreData);
|
||||
|
||||
// Verify source now has 3 rows
|
||||
expect(await sourceTable.countRows()).toBe(3);
|
||||
|
||||
// Clone from the tagged version (should have only 2 rows)
|
||||
const sourceUri = `${tmpDir.name}/source.lance`;
|
||||
const clonedTable = await db.cloneTable("cloned", sourceUri, {
|
||||
sourceTag: "v1.0",
|
||||
});
|
||||
|
||||
// Verify cloned table has only the tagged version's 2 rows
|
||||
expect(await clonedTable.countRows()).toBe(2);
|
||||
});
|
||||
|
||||
it("should fail when attempting deep clone", async () => {
|
||||
// Create source table with some data
|
||||
const data = [
|
||||
{ id: 1, text: "hello", vector: [1.0, 2.0] },
|
||||
{ id: 2, text: "world", vector: [3.0, 4.0] },
|
||||
];
|
||||
await db.createTable("source", data);
|
||||
|
||||
// Try to create a deep clone (should fail)
|
||||
const sourceUri = `${tmpDir.name}/source.lance`;
|
||||
await expect(
|
||||
db.cloneTable("cloned", sourceUri, { isShallow: false }),
|
||||
).rejects.toThrow("Deep clone is not yet implemented");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -256,6 +256,60 @@ describe("embedding functions", () => {
|
||||
expect(actual).toHaveProperty("text");
|
||||
});
|
||||
|
||||
it("should handle undefined vector field with embedding function correctly", async () => {
|
||||
@register("undefined_test")
|
||||
class MockEmbeddingFunction extends EmbeddingFunction<string> {
|
||||
ndims() {
|
||||
return 3;
|
||||
}
|
||||
embeddingDataType(): Float {
|
||||
return new Float32();
|
||||
}
|
||||
async computeQueryEmbeddings(_data: string) {
|
||||
return [1, 2, 3];
|
||||
}
|
||||
async computeSourceEmbeddings(data: string[]) {
|
||||
return Array.from({ length: data.length }).fill([
|
||||
1, 2, 3,
|
||||
]) as number[][];
|
||||
}
|
||||
}
|
||||
const func = getRegistry()
|
||||
.get<MockEmbeddingFunction>("undefined_test")!
|
||||
.create();
|
||||
const schema = new Schema([
|
||||
new Field("text", new Utf8(), true),
|
||||
new Field(
|
||||
"vector",
|
||||
new FixedSizeList(3, new Field("item", new Float32(), true)),
|
||||
true,
|
||||
),
|
||||
]);
|
||||
|
||||
const db = await connect(tmpDir.name);
|
||||
const table = await db.createEmptyTable("test_undefined", schema, {
|
||||
embeddingFunction: {
|
||||
function: func,
|
||||
sourceColumn: "text",
|
||||
vectorColumn: "vector",
|
||||
},
|
||||
});
|
||||
|
||||
// Test that undefined, null, and omitted vector fields all work
|
||||
await table.add([{ text: "test1", vector: undefined }]);
|
||||
await table.add([{ text: "test2", vector: null }]);
|
||||
await table.add([{ text: "test3" }]);
|
||||
|
||||
const rows = await table.query().toArray();
|
||||
expect(rows.length).toBe(3);
|
||||
|
||||
// All rows should have vectors computed by the embedding function
|
||||
for (const row of rows) {
|
||||
expect(row.vector).toBeDefined();
|
||||
expect(JSON.parse(JSON.stringify(row.vector))).toEqual([1, 2, 3]);
|
||||
}
|
||||
});
|
||||
|
||||
test.each([new Float16(), new Float32(), new Float64()])(
|
||||
"should be able to provide manual embeddings with multiple float datatype",
|
||||
async (floatType) => {
|
||||
|
||||
@@ -512,7 +512,11 @@ function* rowPathsAndValues(
|
||||
if (isObject(value)) {
|
||||
yield* rowPathsAndValues(value, [...basePath, key]);
|
||||
} else {
|
||||
yield [[...basePath, key], value];
|
||||
// Skip undefined values - they should be treated the same as missing fields
|
||||
// for embedding function purposes
|
||||
if (value !== undefined) {
|
||||
yield [[...basePath, key], value];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,6 +268,33 @@ export abstract class Connection {
|
||||
* @param {string[]} namespace The namespace to drop tables from (defaults to root namespace).
|
||||
*/
|
||||
abstract dropAllTables(namespace?: string[]): Promise<void>;
|
||||
|
||||
/**
|
||||
* Clone a table from a source table.
|
||||
*
|
||||
* A shallow clone creates a new table that shares the underlying data files
|
||||
* with the source table but has its own independent manifest. This allows
|
||||
* both the source and cloned tables to evolve independently while initially
|
||||
* sharing the same data, deletion, and index files.
|
||||
*
|
||||
* @param {string} targetTableName - The name of the target table to create.
|
||||
* @param {string} sourceUri - The URI of the source table to clone from.
|
||||
* @param {object} options - Clone options.
|
||||
* @param {string[]} options.targetNamespace - The namespace for the target table (defaults to root namespace).
|
||||
* @param {number} options.sourceVersion - The version of the source table to clone.
|
||||
* @param {string} options.sourceTag - The tag of the source table to clone.
|
||||
* @param {boolean} options.isShallow - Whether to perform a shallow clone (defaults to true).
|
||||
*/
|
||||
abstract cloneTable(
|
||||
targetTableName: string,
|
||||
sourceUri: string,
|
||||
options?: {
|
||||
targetNamespace?: string[];
|
||||
sourceVersion?: number;
|
||||
sourceTag?: string;
|
||||
isShallow?: boolean;
|
||||
},
|
||||
): Promise<Table>;
|
||||
}
|
||||
|
||||
/** @hideconstructor */
|
||||
@@ -332,6 +359,28 @@ export class LocalConnection extends Connection {
|
||||
return new LocalTable(innerTable);
|
||||
}
|
||||
|
||||
async cloneTable(
|
||||
targetTableName: string,
|
||||
sourceUri: string,
|
||||
options?: {
|
||||
targetNamespace?: string[];
|
||||
sourceVersion?: number;
|
||||
sourceTag?: string;
|
||||
isShallow?: boolean;
|
||||
},
|
||||
): Promise<Table> {
|
||||
const innerTable = await this.inner.cloneTable(
|
||||
targetTableName,
|
||||
sourceUri,
|
||||
options?.targetNamespace ?? [],
|
||||
options?.sourceVersion ?? null,
|
||||
options?.sourceTag ?? null,
|
||||
options?.isShallow ?? true,
|
||||
);
|
||||
|
||||
return new LocalTable(innerTable);
|
||||
}
|
||||
|
||||
private getStorageOptions(
|
||||
options?: Partial<CreateTableOptions>,
|
||||
): Record<string, string> | undefined {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-arm64",
|
||||
"version": "0.22.1-beta.1",
|
||||
"version": "0.22.1-beta.3",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.darwin-arm64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-x64",
|
||||
"version": "0.22.1-beta.1",
|
||||
"version": "0.22.1-beta.3",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.darwin-x64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
||||
"version": "0.22.1-beta.1",
|
||||
"version": "0.22.1-beta.3",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-musl",
|
||||
"version": "0.22.1-beta.1",
|
||||
"version": "0.22.1-beta.3",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-gnu",
|
||||
"version": "0.22.1-beta.1",
|
||||
"version": "0.22.1-beta.3",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-musl",
|
||||
"version": "0.22.1-beta.1",
|
||||
"version": "0.22.1-beta.3",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
||||
"version": "0.22.1-beta.1",
|
||||
"version": "0.22.1-beta.3",
|
||||
"os": [
|
||||
"win32"
|
||||
],
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-x64-msvc",
|
||||
"version": "0.22.1-beta.1",
|
||||
"version": "0.22.1-beta.3",
|
||||
"os": ["win32"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.win32-x64-msvc.node",
|
||||
|
||||
4
nodejs/package-lock.json
generated
4
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.22.1-beta.1",
|
||||
"version": "0.22.1-beta.2",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.22.1-beta.1",
|
||||
"version": "0.22.1-beta.2",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
"ann"
|
||||
],
|
||||
"private": false,
|
||||
"version": "0.22.1-beta.1",
|
||||
"version": "0.22.1-beta.3",
|
||||
"main": "dist/index.js",
|
||||
"exports": {
|
||||
".": "./dist/index.js",
|
||||
|
||||
@@ -213,6 +213,36 @@ impl Connection {
|
||||
Ok(Table::new(tbl))
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn clone_table(
|
||||
&self,
|
||||
target_table_name: String,
|
||||
source_uri: String,
|
||||
target_namespace: Vec<String>,
|
||||
source_version: Option<i64>,
|
||||
source_tag: Option<String>,
|
||||
is_shallow: bool,
|
||||
) -> napi::Result<Table> {
|
||||
let mut builder = self
|
||||
.get_inner()?
|
||||
.clone_table(&target_table_name, &source_uri);
|
||||
|
||||
builder = builder.target_namespace(target_namespace);
|
||||
|
||||
if let Some(version) = source_version {
|
||||
builder = builder.source_version(version as u64);
|
||||
}
|
||||
|
||||
if let Some(tag) = source_tag {
|
||||
builder = builder.source_tag(tag);
|
||||
}
|
||||
|
||||
builder = builder.is_shallow(is_shallow);
|
||||
|
||||
let tbl = builder.execute().await.default_error()?;
|
||||
Ok(Table::new(tbl))
|
||||
}
|
||||
|
||||
/// Drop table with the name. Or raise an error if the table does not exist.
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn drop_table(&self, name: String, namespace: Vec<String>) -> napi::Result<()> {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.25.1-beta.2"
|
||||
current_version = "0.25.1-beta.3"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-python"
|
||||
version = "0.25.1-beta.2"
|
||||
version = "0.25.1-beta.3"
|
||||
edition.workspace = true
|
||||
description = "Python bindings for LanceDB"
|
||||
license.workspace = true
|
||||
|
||||
@@ -60,6 +60,15 @@ class Connection(object):
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
) -> Table: ...
|
||||
async def clone_table(
|
||||
self,
|
||||
target_table_name: str,
|
||||
source_uri: str,
|
||||
target_namespace: List[str] = [],
|
||||
source_version: Optional[int] = None,
|
||||
source_tag: Optional[str] = None,
|
||||
is_shallow: bool = True,
|
||||
) -> Table: ...
|
||||
async def rename_table(
|
||||
self,
|
||||
cur_name: str,
|
||||
|
||||
@@ -665,6 +665,60 @@ class LanceDBConnection(DBConnection):
|
||||
index_cache_size=index_cache_size,
|
||||
)
|
||||
|
||||
def clone_table(
|
||||
self,
|
||||
target_table_name: str,
|
||||
source_uri: str,
|
||||
*,
|
||||
target_namespace: List[str] = [],
|
||||
source_version: Optional[int] = None,
|
||||
source_tag: Optional[str] = None,
|
||||
is_shallow: bool = True,
|
||||
) -> LanceTable:
|
||||
"""Clone a table from a source table.
|
||||
|
||||
A shallow clone creates a new table that shares the underlying data files
|
||||
with the source table but has its own independent manifest. This allows
|
||||
both the source and cloned tables to evolve independently while initially
|
||||
sharing the same data, deletion, and index files.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
target_table_name: str
|
||||
The name of the target table to create.
|
||||
source_uri: str
|
||||
The URI of the source table to clone from.
|
||||
target_namespace: List[str], optional
|
||||
The namespace for the target table.
|
||||
None or empty list represents root namespace.
|
||||
source_version: int, optional
|
||||
The version of the source table to clone.
|
||||
source_tag: str, optional
|
||||
The tag of the source table to clone.
|
||||
is_shallow: bool, default True
|
||||
Whether to perform a shallow clone (True) or deep clone (False).
|
||||
Currently only shallow clone is supported.
|
||||
|
||||
Returns
|
||||
-------
|
||||
A LanceTable object representing the cloned table.
|
||||
"""
|
||||
LOOP.run(
|
||||
self._conn.clone_table(
|
||||
target_table_name,
|
||||
source_uri,
|
||||
target_namespace=target_namespace,
|
||||
source_version=source_version,
|
||||
source_tag=source_tag,
|
||||
is_shallow=is_shallow,
|
||||
)
|
||||
)
|
||||
return LanceTable.open(
|
||||
self,
|
||||
target_table_name,
|
||||
namespace=target_namespace,
|
||||
)
|
||||
|
||||
@override
|
||||
def drop_table(
|
||||
self,
|
||||
@@ -1136,6 +1190,54 @@ class AsyncConnection(object):
|
||||
)
|
||||
return AsyncTable(table)
|
||||
|
||||
async def clone_table(
|
||||
self,
|
||||
target_table_name: str,
|
||||
source_uri: str,
|
||||
*,
|
||||
target_namespace: List[str] = [],
|
||||
source_version: Optional[int] = None,
|
||||
source_tag: Optional[str] = None,
|
||||
is_shallow: bool = True,
|
||||
) -> AsyncTable:
|
||||
"""Clone a table from a source table.
|
||||
|
||||
A shallow clone creates a new table that shares the underlying data files
|
||||
with the source table but has its own independent manifest. This allows
|
||||
both the source and cloned tables to evolve independently while initially
|
||||
sharing the same data, deletion, and index files.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
target_table_name: str
|
||||
The name of the target table to create.
|
||||
source_uri: str
|
||||
The URI of the source table to clone from.
|
||||
target_namespace: List[str], optional
|
||||
The namespace for the target table.
|
||||
None or empty list represents root namespace.
|
||||
source_version: int, optional
|
||||
The version of the source table to clone.
|
||||
source_tag: str, optional
|
||||
The tag of the source table to clone.
|
||||
is_shallow: bool, default True
|
||||
Whether to perform a shallow clone (True) or deep clone (False).
|
||||
Currently only shallow clone is supported.
|
||||
|
||||
Returns
|
||||
-------
|
||||
An AsyncTable object representing the cloned table.
|
||||
"""
|
||||
table = await self._inner.clone_table(
|
||||
target_table_name,
|
||||
source_uri,
|
||||
target_namespace=target_namespace,
|
||||
source_version=source_version,
|
||||
source_tag=source_tag,
|
||||
is_shallow=is_shallow,
|
||||
)
|
||||
return AsyncTable(table)
|
||||
|
||||
async def rename_table(
|
||||
self,
|
||||
cur_name: str,
|
||||
|
||||
@@ -212,6 +212,53 @@ class RemoteDBConnection(DBConnection):
|
||||
table = LOOP.run(self._conn.open_table(name, namespace=namespace))
|
||||
return RemoteTable(table, self.db_name)
|
||||
|
||||
def clone_table(
|
||||
self,
|
||||
target_table_name: str,
|
||||
source_uri: str,
|
||||
*,
|
||||
target_namespace: List[str] = [],
|
||||
source_version: Optional[int] = None,
|
||||
source_tag: Optional[str] = None,
|
||||
is_shallow: bool = True,
|
||||
) -> Table:
|
||||
"""Clone a table from a source table.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
target_table_name: str
|
||||
The name of the target table to create.
|
||||
source_uri: str
|
||||
The URI of the source table to clone from.
|
||||
target_namespace: List[str], optional
|
||||
The namespace for the target table.
|
||||
None or empty list represents root namespace.
|
||||
source_version: int, optional
|
||||
The version of the source table to clone.
|
||||
source_tag: str, optional
|
||||
The tag of the source table to clone.
|
||||
is_shallow: bool, default True
|
||||
Whether to perform a shallow clone (True) or deep clone (False).
|
||||
Currently only shallow clone is supported.
|
||||
|
||||
Returns
|
||||
-------
|
||||
A RemoteTable object representing the cloned table.
|
||||
"""
|
||||
from .table import RemoteTable
|
||||
|
||||
table = LOOP.run(
|
||||
self._conn.clone_table(
|
||||
target_table_name,
|
||||
source_uri,
|
||||
target_namespace=target_namespace,
|
||||
source_version=source_version,
|
||||
source_tag=source_tag,
|
||||
is_shallow=is_shallow,
|
||||
)
|
||||
)
|
||||
return RemoteTable(table, self.db_name)
|
||||
|
||||
@override
|
||||
def create_table(
|
||||
self,
|
||||
|
||||
@@ -747,15 +747,16 @@ def test_local_namespace_operations(tmp_path):
|
||||
# Create a local database connection
|
||||
db = lancedb.connect(tmp_path)
|
||||
|
||||
# Test list_namespaces returns empty list
|
||||
# Test list_namespaces returns empty list for root namespace
|
||||
namespaces = list(db.list_namespaces())
|
||||
assert namespaces == []
|
||||
|
||||
# Test list_namespaces with parameters still returns empty list
|
||||
namespaces_with_params = list(
|
||||
db.list_namespaces(namespace=["test"], page_token="token", limit=5)
|
||||
)
|
||||
assert namespaces_with_params == []
|
||||
# Test list_namespaces with non-empty namespace raises NotImplementedError
|
||||
with pytest.raises(
|
||||
NotImplementedError,
|
||||
match="Namespace operations are not supported for listing database",
|
||||
):
|
||||
list(db.list_namespaces(namespace=["test"]))
|
||||
|
||||
|
||||
def test_local_create_namespace_not_supported(tmp_path):
|
||||
@@ -830,3 +831,119 @@ def test_local_table_operations_with_namespace_raise_error(tmp_path):
|
||||
# Test table_names without namespace - should work normally
|
||||
tables_root = list(db.table_names())
|
||||
assert "test_table" in tables_root
|
||||
|
||||
|
||||
def test_clone_table_latest_version(tmp_path):
|
||||
"""Test cloning a table with the latest version (default behavior)"""
|
||||
import os
|
||||
|
||||
db = lancedb.connect(tmp_path)
|
||||
|
||||
# Create source table with some data
|
||||
data = [
|
||||
{"id": 1, "text": "hello", "vector": [1.0, 2.0]},
|
||||
{"id": 2, "text": "world", "vector": [3.0, 4.0]},
|
||||
]
|
||||
source_table = db.create_table("source", data=data)
|
||||
|
||||
# Add more data to create a new version
|
||||
more_data = [{"id": 3, "text": "test", "vector": [5.0, 6.0]}]
|
||||
source_table.add(more_data)
|
||||
|
||||
# Clone the table (should get latest version with 3 rows)
|
||||
source_uri = os.path.join(tmp_path, "source.lance")
|
||||
cloned_table = db.clone_table("cloned", source_uri)
|
||||
|
||||
# Verify cloned table has all 3 rows
|
||||
assert cloned_table.count_rows() == 3
|
||||
assert "cloned" in db.table_names()
|
||||
|
||||
# Verify data matches
|
||||
cloned_data = cloned_table.to_pandas()
|
||||
assert len(cloned_data) == 3
|
||||
assert set(cloned_data["id"].tolist()) == {1, 2, 3}
|
||||
|
||||
|
||||
def test_clone_table_specific_version(tmp_path):
|
||||
"""Test cloning a table from a specific version"""
|
||||
import os
|
||||
|
||||
db = lancedb.connect(tmp_path)
|
||||
|
||||
# Create source table with initial data
|
||||
data = [
|
||||
{"id": 1, "text": "hello", "vector": [1.0, 2.0]},
|
||||
{"id": 2, "text": "world", "vector": [3.0, 4.0]},
|
||||
]
|
||||
source_table = db.create_table("source", data=data)
|
||||
|
||||
# Get the initial version
|
||||
initial_version = source_table.version
|
||||
|
||||
# Add more data to create a new version
|
||||
more_data = [{"id": 3, "text": "test", "vector": [5.0, 6.0]}]
|
||||
source_table.add(more_data)
|
||||
|
||||
# Verify source now has 3 rows
|
||||
assert source_table.count_rows() == 3
|
||||
|
||||
# Clone from the initial version (should have only 2 rows)
|
||||
source_uri = os.path.join(tmp_path, "source.lance")
|
||||
cloned_table = db.clone_table("cloned", source_uri, source_version=initial_version)
|
||||
|
||||
# Verify cloned table has only the initial 2 rows
|
||||
assert cloned_table.count_rows() == 2
|
||||
cloned_data = cloned_table.to_pandas()
|
||||
assert set(cloned_data["id"].tolist()) == {1, 2}
|
||||
|
||||
|
||||
def test_clone_table_with_tag(tmp_path):
|
||||
"""Test cloning a table from a tagged version"""
|
||||
import os
|
||||
|
||||
db = lancedb.connect(tmp_path)
|
||||
|
||||
# Create source table with initial data
|
||||
data = [
|
||||
{"id": 1, "text": "hello", "vector": [1.0, 2.0]},
|
||||
{"id": 2, "text": "world", "vector": [3.0, 4.0]},
|
||||
]
|
||||
source_table = db.create_table("source", data=data)
|
||||
|
||||
# Create a tag for the current version
|
||||
source_table.tags.create("v1.0", source_table.version)
|
||||
|
||||
# Add more data after the tag
|
||||
more_data = [{"id": 3, "text": "test", "vector": [5.0, 6.0]}]
|
||||
source_table.add(more_data)
|
||||
|
||||
# Verify source now has 3 rows
|
||||
assert source_table.count_rows() == 3
|
||||
|
||||
# Clone from the tagged version (should have only 2 rows)
|
||||
source_uri = os.path.join(tmp_path, "source.lance")
|
||||
cloned_table = db.clone_table("cloned", source_uri, source_tag="v1.0")
|
||||
|
||||
# Verify cloned table has only the tagged version's 2 rows
|
||||
assert cloned_table.count_rows() == 2
|
||||
cloned_data = cloned_table.to_pandas()
|
||||
assert set(cloned_data["id"].tolist()) == {1, 2}
|
||||
|
||||
|
||||
def test_clone_table_deep_clone_fails(tmp_path):
|
||||
"""Test that deep clone raises an unsupported error"""
|
||||
import os
|
||||
|
||||
db = lancedb.connect(tmp_path)
|
||||
|
||||
# Create source table with some data
|
||||
data = [
|
||||
{"id": 1, "text": "hello", "vector": [1.0, 2.0]},
|
||||
{"id": 2, "text": "world", "vector": [3.0, 4.0]},
|
||||
]
|
||||
db.create_table("source", data=data)
|
||||
|
||||
# Try to create a deep clone (should fail)
|
||||
source_uri = os.path.join(tmp_path, "source.lance")
|
||||
with pytest.raises(Exception, match="Deep clone is not yet implemented"):
|
||||
db.clone_table("cloned", source_uri, is_shallow=False)
|
||||
|
||||
@@ -163,6 +163,34 @@ impl Connection {
|
||||
})
|
||||
}
|
||||
|
||||
#[pyo3(signature = (target_table_name, source_uri, target_namespace=vec![], source_version=None, source_tag=None, is_shallow=true))]
|
||||
pub fn clone_table(
|
||||
self_: PyRef<'_, Self>,
|
||||
target_table_name: String,
|
||||
source_uri: String,
|
||||
target_namespace: Vec<String>,
|
||||
source_version: Option<u64>,
|
||||
source_tag: Option<String>,
|
||||
is_shallow: bool,
|
||||
) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.get_inner()?.clone();
|
||||
|
||||
let mut builder = inner.clone_table(target_table_name, source_uri);
|
||||
builder = builder.target_namespace(target_namespace);
|
||||
if let Some(version) = source_version {
|
||||
builder = builder.source_version(version);
|
||||
}
|
||||
if let Some(tag) = source_tag {
|
||||
builder = builder.source_tag(tag);
|
||||
}
|
||||
builder = builder.is_shallow(is_shallow);
|
||||
|
||||
future_into_py(self_.py(), async move {
|
||||
let table = builder.execute().await.infer_error()?;
|
||||
Ok(Table::new(table))
|
||||
})
|
||||
}
|
||||
|
||||
#[pyo3(signature = (cur_name, new_name, cur_namespace=vec![], new_namespace=vec![]))]
|
||||
pub fn rename_table(
|
||||
self_: PyRef<'_, Self>,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb"
|
||||
version = "0.22.1-beta.1"
|
||||
version = "0.22.1-beta.3"
|
||||
edition.workspace = true
|
||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
|
||||
@@ -17,9 +17,9 @@ use crate::database::listing::{
|
||||
ListingDatabase, OPT_NEW_TABLE_STORAGE_VERSION, OPT_NEW_TABLE_V2_MANIFEST_PATHS,
|
||||
};
|
||||
use crate::database::{
|
||||
CreateNamespaceRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database,
|
||||
DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest,
|
||||
TableNamesRequest,
|
||||
CloneTableRequest, CreateNamespaceRequest, CreateTableData, CreateTableMode,
|
||||
CreateTableRequest, Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest,
|
||||
OpenTableRequest, TableNamesRequest,
|
||||
};
|
||||
use crate::embeddings::{
|
||||
EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry, MemoryRegistry, WithEmbeddings,
|
||||
@@ -469,6 +469,62 @@ impl OpenTableBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder for cloning a table.
|
||||
///
|
||||
/// A shallow clone creates a new table that shares the underlying data files
|
||||
/// with the source table but has its own independent manifest. Both the source
|
||||
/// and cloned tables can evolve independently while initially sharing the same
|
||||
/// data, deletion, and index files.
|
||||
///
|
||||
/// Use this builder to configure the clone operation before executing it.
|
||||
pub struct CloneTableBuilder {
|
||||
parent: Arc<dyn Database>,
|
||||
request: CloneTableRequest,
|
||||
}
|
||||
|
||||
impl CloneTableBuilder {
|
||||
fn new(parent: Arc<dyn Database>, target_table_name: String, source_uri: String) -> Self {
|
||||
Self {
|
||||
parent,
|
||||
request: CloneTableRequest::new(target_table_name, source_uri),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the source version to clone from
|
||||
pub fn source_version(mut self, version: u64) -> Self {
|
||||
self.request.source_version = Some(version);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the source tag to clone from
|
||||
pub fn source_tag(mut self, tag: impl Into<String>) -> Self {
|
||||
self.request.source_tag = Some(tag.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the target namespace for the cloned table
|
||||
pub fn target_namespace(mut self, namespace: Vec<String>) -> Self {
|
||||
self.request.target_namespace = namespace;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether to perform a shallow clone (default: true)
|
||||
///
|
||||
/// When true, the cloned table shares data files with the source table.
|
||||
/// When false, performs a deep clone (not yet implemented).
|
||||
pub fn is_shallow(mut self, is_shallow: bool) -> Self {
|
||||
self.request.is_shallow = is_shallow;
|
||||
self
|
||||
}
|
||||
|
||||
/// Execute the clone operation
|
||||
pub async fn execute(self) -> Result<Table> {
|
||||
Ok(Table::new(
|
||||
self.parent.clone().clone_table(self.request).await?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// A connection to LanceDB
|
||||
#[derive(Clone)]
|
||||
pub struct Connection {
|
||||
@@ -575,6 +631,30 @@ impl Connection {
|
||||
)
|
||||
}
|
||||
|
||||
/// Clone a table in the database
|
||||
///
|
||||
/// Creates a new table by cloning from an existing source table.
|
||||
/// By default, this performs a shallow clone where the new table shares
|
||||
/// the underlying data files with the source table.
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `target_table_name`: The name of the new table to create
|
||||
/// - `source_uri`: The URI of the source table to clone from
|
||||
///
|
||||
/// # Returns
|
||||
/// A [`CloneTableBuilder`] that can be used to configure the clone operation
|
||||
pub fn clone_table(
|
||||
&self,
|
||||
target_table_name: impl Into<String>,
|
||||
source_uri: impl Into<String>,
|
||||
) -> CloneTableBuilder {
|
||||
CloneTableBuilder::new(
|
||||
self.internal.clone(),
|
||||
target_table_name.into(),
|
||||
source_uri.into(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Rename a table in the database.
|
||||
///
|
||||
/// This is only supported in LanceDB Cloud.
|
||||
@@ -1281,4 +1361,50 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(other_schema, overwritten.schema().await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let db = connect(uri).execute().await.unwrap();
|
||||
|
||||
// Create a source table with some data
|
||||
let mut batch_gen = BatchGenerator::new()
|
||||
.col(Box::new(IncrementingInt32::new().named("id")))
|
||||
.col(Box::new(IncrementingInt32::new().named("value")));
|
||||
let reader = batch_gen.batches(5, 100);
|
||||
|
||||
let source_table = db
|
||||
.create_table("source_table", reader)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Get the source table URI
|
||||
let source_table_path = tmp_dir.path().join("source_table.lance");
|
||||
let source_uri = source_table_path.to_str().unwrap();
|
||||
|
||||
// Clone the table
|
||||
let cloned_table = db
|
||||
.clone_table("cloned_table", source_uri)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify the cloned table exists
|
||||
let table_names = db.table_names().execute().await.unwrap();
|
||||
assert!(table_names.contains(&"source_table".to_string()));
|
||||
assert!(table_names.contains(&"cloned_table".to_string()));
|
||||
|
||||
// Verify the cloned table has the same schema
|
||||
assert_eq!(
|
||||
source_table.schema().await.unwrap(),
|
||||
cloned_table.schema().await.unwrap()
|
||||
);
|
||||
|
||||
// Verify the cloned table has the same data
|
||||
let source_count = source_table.count_rows(None).await.unwrap();
|
||||
let cloned_count = cloned_table.count_rows(None).await.unwrap();
|
||||
assert_eq!(source_count, cloned_count);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,6 +176,42 @@ impl CreateTableRequest {
|
||||
}
|
||||
}
|
||||
|
||||
/// Request to clone a table from a source table.
|
||||
///
|
||||
/// A shallow clone creates a new table that shares the underlying data files
|
||||
/// with the source table but has its own independent manifest. This allows
|
||||
/// both the source and cloned tables to evolve independently while initially
|
||||
/// sharing the same data, deletion, and index files.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CloneTableRequest {
|
||||
/// The name of the target table to create
|
||||
pub target_table_name: String,
|
||||
/// The namespace for the target table. Empty list represents root namespace.
|
||||
pub target_namespace: Vec<String>,
|
||||
/// The URI of the source table to clone from.
|
||||
pub source_uri: String,
|
||||
/// Optional version of the source table to clone.
|
||||
pub source_version: Option<u64>,
|
||||
/// Optional tag of the source table to clone.
|
||||
pub source_tag: Option<String>,
|
||||
/// Whether to perform a shallow clone (true) or deep clone (false). Defaults to true.
|
||||
/// Currently only shallow clone is supported.
|
||||
pub is_shallow: bool,
|
||||
}
|
||||
|
||||
impl CloneTableRequest {
|
||||
pub fn new(target_table_name: String, source_uri: String) -> Self {
|
||||
Self {
|
||||
target_table_name,
|
||||
target_namespace: vec![],
|
||||
source_uri,
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The `Database` trait defines the interface for database implementations.
|
||||
///
|
||||
/// A database is responsible for managing tables and their metadata.
|
||||
@@ -193,6 +229,13 @@ pub trait Database:
|
||||
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>>;
|
||||
/// Create a table in the database
|
||||
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>>;
|
||||
/// Clone a table in the database.
|
||||
///
|
||||
/// Creates a shallow clone of the source table, sharing underlying data files
|
||||
/// but with an independent manifest. Both tables can evolve separately after cloning.
|
||||
///
|
||||
/// See [`CloneTableRequest`] for detailed documentation and examples.
|
||||
async fn clone_table(&self, request: CloneTableRequest) -> Result<Arc<dyn BaseTable>>;
|
||||
/// Open a table in the database
|
||||
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>>;
|
||||
/// Rename a table in the database
|
||||
|
||||
@@ -7,7 +7,8 @@ use std::fs::create_dir_all;
|
||||
use std::path::Path;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use lance::dataset::{ReadParams, WriteMode};
|
||||
use lance::dataset::refs::Ref;
|
||||
use lance::dataset::{builder::DatasetBuilder, ReadParams, WriteMode};
|
||||
use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore};
|
||||
use lance_datafusion::utils::StreamingWriteSource;
|
||||
use lance_encoding::version::LanceFileVersion;
|
||||
@@ -22,8 +23,8 @@ use crate::table::NativeTable;
|
||||
use crate::utils::validate_table_name;
|
||||
|
||||
use super::{
|
||||
BaseTable, CreateNamespaceRequest, CreateTableMode, CreateTableRequest, Database,
|
||||
DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest,
|
||||
BaseTable, CloneTableRequest, CreateNamespaceRequest, CreateTableMode, CreateTableRequest,
|
||||
Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest,
|
||||
TableNamesRequest,
|
||||
};
|
||||
|
||||
@@ -684,6 +685,65 @@ impl Database for ListingDatabase {
|
||||
}
|
||||
}
|
||||
|
||||
async fn clone_table(&self, request: CloneTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||
if !request.target_namespace.is_empty() {
|
||||
return Err(Error::NotSupported {
|
||||
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: support deep clone
|
||||
if !request.is_shallow {
|
||||
return Err(Error::NotSupported {
|
||||
message: "Deep clone is not yet implemented".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
validate_table_name(&request.target_table_name)?;
|
||||
|
||||
let storage_params = ObjectStoreParams {
|
||||
storage_options: Some(self.storage_options.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
let read_params = ReadParams {
|
||||
store_options: Some(storage_params.clone()),
|
||||
session: Some(self.session.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut source_dataset = DatasetBuilder::from_uri(&request.source_uri)
|
||||
.with_read_params(read_params.clone())
|
||||
.load()
|
||||
.await
|
||||
.map_err(|e| Error::Lance { source: e })?;
|
||||
|
||||
let version_ref = match (request.source_version, request.source_tag) {
|
||||
(Some(v), None) => Ok(Ref::Version(v)),
|
||||
(None, Some(tag)) => Ok(Ref::Tag(tag)),
|
||||
(None, None) => Ok(Ref::Version(source_dataset.version().version)),
|
||||
_ => Err(Error::InvalidInput {
|
||||
message: "Cannot specify both source_version and source_tag".to_string(),
|
||||
}),
|
||||
}?;
|
||||
|
||||
let target_uri = self.table_uri(&request.target_table_name)?;
|
||||
source_dataset
|
||||
.shallow_clone(&target_uri, version_ref, storage_params)
|
||||
.await
|
||||
.map_err(|e| Error::Lance { source: e })?;
|
||||
|
||||
let cloned_table = NativeTable::open_with_params(
|
||||
&target_uri,
|
||||
&request.target_table_name,
|
||||
self.store_wrapper.clone(),
|
||||
None,
|
||||
self.read_consistency_interval,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Arc::new(cloned_table))
|
||||
}
|
||||
|
||||
async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||
if !request.namespace.is_empty() {
|
||||
return Err(Error::NotSupported {
|
||||
@@ -785,3 +845,694 @@ impl Database for ListingDatabase {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::connection::ConnectRequest;
|
||||
use crate::database::{CreateTableData, CreateTableMode, CreateTableRequest};
|
||||
use crate::table::{Table, TableDefinition};
|
||||
use arrow_array::{Int32Array, RecordBatch, StringArray};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
use tempfile::tempdir;
|
||||
|
||||
async fn setup_database() -> (tempfile::TempDir, ListingDatabase) {
|
||||
let tempdir = tempdir().unwrap();
|
||||
let uri = tempdir.path().to_str().unwrap();
|
||||
|
||||
let request = ConnectRequest {
|
||||
uri: uri.to_string(),
|
||||
#[cfg(feature = "remote")]
|
||||
client_config: Default::default(),
|
||||
options: Default::default(),
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
|
||||
let db = ListingDatabase::connect_with_options(&request)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
(tempdir, db)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_basic() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
// Create a source table with schema
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int32, false),
|
||||
Field::new("name", DataType::Utf8, false),
|
||||
]));
|
||||
|
||||
let source_table = db
|
||||
.create_table(CreateTableRequest {
|
||||
name: "source_table".to_string(),
|
||||
namespace: vec![],
|
||||
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema.clone())),
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Get the source table URI
|
||||
let source_uri = db.table_uri("source_table").unwrap();
|
||||
|
||||
// Clone the table
|
||||
let cloned_table = db
|
||||
.clone_table(CloneTableRequest {
|
||||
target_table_name: "cloned_table".to_string(),
|
||||
target_namespace: vec![],
|
||||
source_uri: source_uri.clone(),
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify both tables exist
|
||||
let table_names = db.table_names(TableNamesRequest::default()).await.unwrap();
|
||||
assert!(table_names.contains(&"source_table".to_string()));
|
||||
assert!(table_names.contains(&"cloned_table".to_string()));
|
||||
|
||||
// Verify schemas match
|
||||
assert_eq!(
|
||||
source_table.schema().await.unwrap(),
|
||||
cloned_table.schema().await.unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_with_data() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
// Create a source table with actual data
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int32, false),
|
||||
Field::new("name", DataType::Utf8, false),
|
||||
]));
|
||||
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from(vec![1, 2, 3])),
|
||||
Arc::new(StringArray::from(vec!["a", "b", "c"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let reader = Box::new(arrow_array::RecordBatchIterator::new(
|
||||
vec![Ok(batch)],
|
||||
schema.clone(),
|
||||
));
|
||||
|
||||
let source_table = db
|
||||
.create_table(CreateTableRequest {
|
||||
name: "source_with_data".to_string(),
|
||||
namespace: vec![],
|
||||
data: CreateTableData::Data(reader),
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let source_uri = db.table_uri("source_with_data").unwrap();
|
||||
|
||||
// Clone the table
|
||||
let cloned_table = db
|
||||
.clone_table(CloneTableRequest {
|
||||
target_table_name: "cloned_with_data".to_string(),
|
||||
target_namespace: vec![],
|
||||
source_uri,
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify data counts match
|
||||
let source_count = source_table.count_rows(None).await.unwrap();
|
||||
let cloned_count = cloned_table.count_rows(None).await.unwrap();
|
||||
assert_eq!(source_count, cloned_count);
|
||||
assert_eq!(source_count, 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_with_storage_options() {
|
||||
let tempdir = tempdir().unwrap();
|
||||
let uri = tempdir.path().to_str().unwrap();
|
||||
|
||||
// Create database with storage options
|
||||
let mut options = HashMap::new();
|
||||
options.insert("test_option".to_string(), "test_value".to_string());
|
||||
|
||||
let request = ConnectRequest {
|
||||
uri: uri.to_string(),
|
||||
#[cfg(feature = "remote")]
|
||||
client_config: Default::default(),
|
||||
options: options.clone(),
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
|
||||
let db = ListingDatabase::connect_with_options(&request)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create source table
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
|
||||
|
||||
db.create_table(CreateTableRequest {
|
||||
name: "source".to_string(),
|
||||
namespace: vec![],
|
||||
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let source_uri = db.table_uri("source").unwrap();
|
||||
|
||||
// Clone should work with storage options
|
||||
let cloned = db
|
||||
.clone_table(CloneTableRequest {
|
||||
target_table_name: "cloned".to_string(),
|
||||
target_namespace: vec![],
|
||||
source_uri,
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(cloned.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_deep_not_supported() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
// Create a source table
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
|
||||
|
||||
db.create_table(CreateTableRequest {
|
||||
name: "source".to_string(),
|
||||
namespace: vec![],
|
||||
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let source_uri = db.table_uri("source").unwrap();
|
||||
|
||||
// Try deep clone (should fail)
|
||||
let result = db
|
||||
.clone_table(CloneTableRequest {
|
||||
target_table_name: "cloned".to_string(),
|
||||
target_namespace: vec![],
|
||||
source_uri,
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: false, // Request deep clone
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(
|
||||
result.unwrap_err(),
|
||||
Error::NotSupported { message } if message.contains("Deep clone")
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_with_namespace_not_supported() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
// Create a source table
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
|
||||
|
||||
db.create_table(CreateTableRequest {
|
||||
name: "source".to_string(),
|
||||
namespace: vec![],
|
||||
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let source_uri = db.table_uri("source").unwrap();
|
||||
|
||||
// Try clone with namespace (should fail for listing database)
|
||||
let result = db
|
||||
.clone_table(CloneTableRequest {
|
||||
target_table_name: "cloned".to_string(),
|
||||
target_namespace: vec!["namespace".to_string()], // Non-empty namespace
|
||||
source_uri,
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(
|
||||
result.unwrap_err(),
|
||||
Error::NotSupported { message } if message.contains("Namespace parameter is not supported")
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_invalid_target_name() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
// Create a source table
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
|
||||
|
||||
db.create_table(CreateTableRequest {
|
||||
name: "source".to_string(),
|
||||
namespace: vec![],
|
||||
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let source_uri = db.table_uri("source").unwrap();
|
||||
|
||||
// Try clone with invalid target name
|
||||
let result = db
|
||||
.clone_table(CloneTableRequest {
|
||||
target_table_name: "invalid/name".to_string(), // Invalid name with slash
|
||||
target_namespace: vec![],
|
||||
source_uri,
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_source_not_found() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
// Try to clone from non-existent source
|
||||
let result = db
|
||||
.clone_table(CloneTableRequest {
|
||||
target_table_name: "cloned".to_string(),
|
||||
target_namespace: vec![],
|
||||
source_uri: "/nonexistent/table.lance".to_string(),
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_with_version_and_tag_error() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
// Create a source table
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
|
||||
|
||||
db.create_table(CreateTableRequest {
|
||||
name: "source".to_string(),
|
||||
namespace: vec![],
|
||||
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let source_uri = db.table_uri("source").unwrap();
|
||||
|
||||
// Try clone with both version and tag (should fail)
|
||||
let result = db
|
||||
.clone_table(CloneTableRequest {
|
||||
target_table_name: "cloned".to_string(),
|
||||
target_namespace: vec![],
|
||||
source_uri,
|
||||
source_version: Some(1),
|
||||
source_tag: Some("v1.0".to_string()),
|
||||
is_shallow: true,
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(
|
||||
result.unwrap_err(),
|
||||
Error::InvalidInput { message } if message.contains("Cannot specify both source_version and source_tag")
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_with_specific_version() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
// Create a source table with initial data
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int32, false),
|
||||
Field::new("value", DataType::Utf8, false),
|
||||
]));
|
||||
|
||||
let batch1 = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from(vec![1, 2])),
|
||||
Arc::new(StringArray::from(vec!["a", "b"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let reader = Box::new(arrow_array::RecordBatchIterator::new(
|
||||
vec![Ok(batch1)],
|
||||
schema.clone(),
|
||||
));
|
||||
|
||||
let source_table = db
|
||||
.create_table(CreateTableRequest {
|
||||
name: "versioned_source".to_string(),
|
||||
namespace: vec![],
|
||||
data: CreateTableData::Data(reader),
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Get the initial version
|
||||
let initial_version = source_table.version().await.unwrap();
|
||||
|
||||
// Add more data to create a new version
|
||||
let batch2 = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from(vec![3, 4])),
|
||||
Arc::new(StringArray::from(vec!["c", "d"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let source_table_obj = Table::new(source_table.clone());
|
||||
source_table_obj
|
||||
.add(Box::new(arrow_array::RecordBatchIterator::new(
|
||||
vec![Ok(batch2)],
|
||||
schema.clone(),
|
||||
)))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify source table now has 4 rows
|
||||
assert_eq!(source_table.count_rows(None).await.unwrap(), 4);
|
||||
|
||||
let source_uri = db.table_uri("versioned_source").unwrap();
|
||||
|
||||
// Clone from the initial version (should have only 2 rows)
|
||||
let cloned_table = db
|
||||
.clone_table(CloneTableRequest {
|
||||
target_table_name: "cloned_from_version".to_string(),
|
||||
target_namespace: vec![],
|
||||
source_uri,
|
||||
source_version: Some(initial_version),
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify cloned table has only the initial 2 rows
|
||||
assert_eq!(cloned_table.count_rows(None).await.unwrap(), 2);
|
||||
|
||||
// Source table should still have 4 rows
|
||||
assert_eq!(source_table.count_rows(None).await.unwrap(), 4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_with_tag() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
// Create a source table with initial data
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int32, false),
|
||||
Field::new("value", DataType::Utf8, false),
|
||||
]));
|
||||
|
||||
let batch1 = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from(vec![1, 2])),
|
||||
Arc::new(StringArray::from(vec!["a", "b"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let reader = Box::new(arrow_array::RecordBatchIterator::new(
|
||||
vec![Ok(batch1)],
|
||||
schema.clone(),
|
||||
));
|
||||
|
||||
let source_table = db
|
||||
.create_table(CreateTableRequest {
|
||||
name: "tagged_source".to_string(),
|
||||
namespace: vec![],
|
||||
data: CreateTableData::Data(reader),
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create a tag for the current version
|
||||
let source_table_obj = Table::new(source_table.clone());
|
||||
let mut tags = source_table_obj.tags().await.unwrap();
|
||||
tags.create("v1.0", source_table.version().await.unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Add more data after the tag
|
||||
let batch2 = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from(vec![3, 4])),
|
||||
Arc::new(StringArray::from(vec!["c", "d"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let source_table_obj = Table::new(source_table.clone());
|
||||
source_table_obj
|
||||
.add(Box::new(arrow_array::RecordBatchIterator::new(
|
||||
vec![Ok(batch2)],
|
||||
schema.clone(),
|
||||
)))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Source table should have 4 rows
|
||||
assert_eq!(source_table.count_rows(None).await.unwrap(), 4);
|
||||
|
||||
let source_uri = db.table_uri("tagged_source").unwrap();
|
||||
|
||||
// Clone from the tag (should have only 2 rows)
|
||||
let cloned_table = db
|
||||
.clone_table(CloneTableRequest {
|
||||
target_table_name: "cloned_from_tag".to_string(),
|
||||
target_namespace: vec![],
|
||||
source_uri,
|
||||
source_version: None,
|
||||
source_tag: Some("v1.0".to_string()),
|
||||
is_shallow: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify cloned table has only the tagged version's 2 rows
|
||||
assert_eq!(cloned_table.count_rows(None).await.unwrap(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cloned_tables_evolve_independently() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
// Create a source table with initial data
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int32, false),
|
||||
Field::new("value", DataType::Utf8, false),
|
||||
]));
|
||||
|
||||
let batch1 = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from(vec![1, 2])),
|
||||
Arc::new(StringArray::from(vec!["a", "b"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let reader = Box::new(arrow_array::RecordBatchIterator::new(
|
||||
vec![Ok(batch1)],
|
||||
schema.clone(),
|
||||
));
|
||||
|
||||
let source_table = db
|
||||
.create_table(CreateTableRequest {
|
||||
name: "independent_source".to_string(),
|
||||
namespace: vec![],
|
||||
data: CreateTableData::Data(reader),
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let source_uri = db.table_uri("independent_source").unwrap();
|
||||
|
||||
// Clone the table
|
||||
let cloned_table = db
|
||||
.clone_table(CloneTableRequest {
|
||||
target_table_name: "independent_clone".to_string(),
|
||||
target_namespace: vec![],
|
||||
source_uri,
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Both should start with 2 rows
|
||||
assert_eq!(source_table.count_rows(None).await.unwrap(), 2);
|
||||
assert_eq!(cloned_table.count_rows(None).await.unwrap(), 2);
|
||||
|
||||
// Add data to the cloned table
|
||||
let batch_clone = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from(vec![3, 4, 5])),
|
||||
Arc::new(StringArray::from(vec!["c", "d", "e"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let cloned_table_obj = Table::new(cloned_table.clone());
|
||||
cloned_table_obj
|
||||
.add(Box::new(arrow_array::RecordBatchIterator::new(
|
||||
vec![Ok(batch_clone)],
|
||||
schema.clone(),
|
||||
)))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Add different data to the source table
|
||||
let batch_source = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from(vec![10, 11])),
|
||||
Arc::new(StringArray::from(vec!["x", "y"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let source_table_obj = Table::new(source_table.clone());
|
||||
source_table_obj
|
||||
.add(Box::new(arrow_array::RecordBatchIterator::new(
|
||||
vec![Ok(batch_source)],
|
||||
schema.clone(),
|
||||
)))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify they have evolved independently
|
||||
assert_eq!(source_table.count_rows(None).await.unwrap(), 4); // 2 + 2
|
||||
assert_eq!(cloned_table.count_rows(None).await.unwrap(), 5); // 2 + 3
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_latest_version() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
// Create a source table with initial data
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
|
||||
|
||||
let batch1 =
|
||||
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1, 2]))])
|
||||
.unwrap();
|
||||
|
||||
let reader = Box::new(arrow_array::RecordBatchIterator::new(
|
||||
vec![Ok(batch1)],
|
||||
schema.clone(),
|
||||
));
|
||||
|
||||
let source_table = db
|
||||
.create_table(CreateTableRequest {
|
||||
name: "latest_version_source".to_string(),
|
||||
namespace: vec![],
|
||||
data: CreateTableData::Data(reader),
|
||||
mode: CreateTableMode::Create,
|
||||
write_options: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Add more data to create new versions
|
||||
for i in 0..3 {
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(Int32Array::from(vec![i * 10, i * 10 + 1]))],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let source_table_obj = Table::new(source_table.clone());
|
||||
source_table_obj
|
||||
.add(Box::new(arrow_array::RecordBatchIterator::new(
|
||||
vec![Ok(batch)],
|
||||
schema.clone(),
|
||||
)))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Source should have 8 rows total (2 + 2 + 2 + 2)
|
||||
let source_count = source_table.count_rows(None).await.unwrap();
|
||||
assert_eq!(source_count, 8);
|
||||
|
||||
let source_uri = db.table_uri("latest_version_source").unwrap();
|
||||
|
||||
// Clone without specifying version or tag (should get latest)
|
||||
let cloned_table = db
|
||||
.clone_table(CloneTableRequest {
|
||||
target_table_name: "cloned_latest".to_string(),
|
||||
target_namespace: vec![],
|
||||
source_uri,
|
||||
source_version: None,
|
||||
source_tag: None,
|
||||
is_shallow: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Cloned table should have all 8 rows from the latest version
|
||||
assert_eq!(cloned_table.count_rows(None).await.unwrap(), 8);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,9 +14,9 @@ use serde::Deserialize;
|
||||
use tokio::task::spawn_blocking;
|
||||
|
||||
use crate::database::{
|
||||
CreateNamespaceRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database,
|
||||
DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest,
|
||||
TableNamesRequest,
|
||||
CloneTableRequest, CreateNamespaceRequest, CreateTableData, CreateTableMode,
|
||||
CreateTableRequest, Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest,
|
||||
OpenTableRequest, TableNamesRequest,
|
||||
};
|
||||
use crate::error::Result;
|
||||
use crate::table::BaseTable;
|
||||
@@ -27,6 +27,18 @@ use super::table::RemoteTable;
|
||||
use super::util::{batches_to_ipc_bytes, parse_server_version};
|
||||
use super::ARROW_STREAM_CONTENT_TYPE;
|
||||
|
||||
// Request structure for the remote clone table API
|
||||
#[derive(serde::Serialize)]
|
||||
struct RemoteCloneTableRequest {
|
||||
source_location: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
source_version: Option<u64>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
source_tag: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
is_shallow: Option<bool>,
|
||||
}
|
||||
|
||||
// the versions of the server that we support
|
||||
// for any new feature that we need to change the SDK behavior, we should bump the server version,
|
||||
// and add a feature flag as method of `ServerVersion` here.
|
||||
@@ -430,6 +442,51 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
async fn clone_table(&self, request: CloneTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||
let table_identifier = build_table_identifier(
|
||||
&request.target_table_name,
|
||||
&request.target_namespace,
|
||||
&self.client.id_delimiter,
|
||||
);
|
||||
|
||||
let remote_request = RemoteCloneTableRequest {
|
||||
source_location: request.source_uri,
|
||||
source_version: request.source_version,
|
||||
source_tag: request.source_tag,
|
||||
is_shallow: Some(request.is_shallow),
|
||||
};
|
||||
|
||||
let req = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/clone", table_identifier.clone()))
|
||||
.json(&remote_request);
|
||||
|
||||
let (request_id, rsp) = self.client.send(req).await?;
|
||||
|
||||
let status = rsp.status();
|
||||
if status != StatusCode::OK {
|
||||
let body = rsp.text().await.err_to_http(request_id.clone())?;
|
||||
return Err(crate::Error::Http {
|
||||
source: format!("Failed to clone table: {}", body).into(),
|
||||
request_id,
|
||||
status_code: Some(status),
|
||||
});
|
||||
}
|
||||
|
||||
let version = parse_server_version(&request_id, &rsp)?;
|
||||
let cache_key = build_cache_key(&request.target_table_name, &request.target_namespace);
|
||||
let table = Arc::new(RemoteTable::new(
|
||||
self.client.clone(),
|
||||
request.target_table_name.clone(),
|
||||
request.target_namespace.clone(),
|
||||
table_identifier,
|
||||
version,
|
||||
));
|
||||
self.table_cache.insert(cache_key, table.clone()).await;
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||
let identifier =
|
||||
build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter);
|
||||
@@ -1221,4 +1278,146 @@ mod tests {
|
||||
_ => panic!("Expected Runtime error from header provider"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table() {
|
||||
let conn = Connection::new_with_handler(|request| {
|
||||
assert_eq!(request.method(), &reqwest::Method::POST);
|
||||
assert_eq!(request.url().path(), "/v1/table/cloned_table/clone");
|
||||
assert_eq!(
|
||||
request.headers().get("Content-Type").unwrap(),
|
||||
JSON_CONTENT_TYPE
|
||||
);
|
||||
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
assert_eq!(body["source_location"], "s3://bucket/source_table");
|
||||
assert_eq!(body["is_shallow"], true);
|
||||
|
||||
http::Response::builder().status(200).body("").unwrap()
|
||||
});
|
||||
|
||||
let table = conn
|
||||
.clone_table("cloned_table", "s3://bucket/source_table")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table.name(), "cloned_table");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_with_version() {
|
||||
let conn = Connection::new_with_handler(|request| {
|
||||
assert_eq!(request.method(), &reqwest::Method::POST);
|
||||
assert_eq!(request.url().path(), "/v1/table/cloned_table/clone");
|
||||
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
assert_eq!(body["source_location"], "s3://bucket/source_table");
|
||||
assert_eq!(body["source_version"], 42);
|
||||
assert_eq!(body["is_shallow"], true);
|
||||
|
||||
http::Response::builder().status(200).body("").unwrap()
|
||||
});
|
||||
|
||||
let table = conn
|
||||
.clone_table("cloned_table", "s3://bucket/source_table")
|
||||
.source_version(42)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table.name(), "cloned_table");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_with_tag() {
|
||||
let conn = Connection::new_with_handler(|request| {
|
||||
assert_eq!(request.method(), &reqwest::Method::POST);
|
||||
assert_eq!(request.url().path(), "/v1/table/cloned_table/clone");
|
||||
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
assert_eq!(body["source_location"], "s3://bucket/source_table");
|
||||
assert_eq!(body["source_tag"], "v1.0");
|
||||
assert_eq!(body["is_shallow"], true);
|
||||
|
||||
http::Response::builder().status(200).body("").unwrap()
|
||||
});
|
||||
|
||||
let table = conn
|
||||
.clone_table("cloned_table", "s3://bucket/source_table")
|
||||
.source_tag("v1.0")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table.name(), "cloned_table");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_deep_clone() {
|
||||
let conn = Connection::new_with_handler(|request| {
|
||||
assert_eq!(request.method(), &reqwest::Method::POST);
|
||||
assert_eq!(request.url().path(), "/v1/table/cloned_table/clone");
|
||||
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
assert_eq!(body["source_location"], "s3://bucket/source_table");
|
||||
assert_eq!(body["is_shallow"], false);
|
||||
|
||||
http::Response::builder().status(200).body("").unwrap()
|
||||
});
|
||||
|
||||
let table = conn
|
||||
.clone_table("cloned_table", "s3://bucket/source_table")
|
||||
.is_shallow(false)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table.name(), "cloned_table");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_with_namespace() {
|
||||
let conn = Connection::new_with_handler(|request| {
|
||||
assert_eq!(request.method(), &reqwest::Method::POST);
|
||||
assert_eq!(request.url().path(), "/v1/table/ns1$ns2$cloned_table/clone");
|
||||
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
assert_eq!(body["source_location"], "s3://bucket/source_table");
|
||||
assert_eq!(body["is_shallow"], true);
|
||||
|
||||
http::Response::builder().status(200).body("").unwrap()
|
||||
});
|
||||
|
||||
let table = conn
|
||||
.clone_table("cloned_table", "s3://bucket/source_table")
|
||||
.target_namespace(vec!["ns1".to_string(), "ns2".to_string()])
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table.name(), "cloned_table");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_clone_table_error() {
|
||||
let conn = Connection::new_with_handler(|_| {
|
||||
http::Response::builder()
|
||||
.status(500)
|
||||
.body("Internal server error")
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let result = conn
|
||||
.clone_table("cloned_table", "s3://bucket/source_table")
|
||||
.execute()
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
if let Err(crate::Error::Http { source, .. }) = result {
|
||||
assert!(source.to_string().contains("Failed to clone table"));
|
||||
} else {
|
||||
panic!("Expected HTTP error");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user