diff --git a/nodejs/__test__/connection.test.ts b/nodejs/__test__/connection.test.ts index 7fcdb85a..b8e5def8 100644 --- a/nodejs/__test__/connection.test.ts +++ b/nodejs/__test__/connection.test.ts @@ -203,3 +203,106 @@ describe("given a connection", () => { }); }); }); + +describe("clone table functionality", () => { + let tmpDir: tmp.DirResult; + let db: Connection; + beforeEach(async () => { + tmpDir = tmp.dirSync({ unsafeCleanup: true }); + db = await connect(tmpDir.name); + }); + afterEach(() => tmpDir.removeCallback()); + + it("should clone a table with latest version (default behavior)", async () => { + // Create source table with some data + const data = [ + { id: 1, text: "hello", vector: [1.0, 2.0] }, + { id: 2, text: "world", vector: [3.0, 4.0] }, + ]; + const sourceTable = await db.createTable("source", data); + + // Add more data to create a new version + const moreData = [{ id: 3, text: "test", vector: [5.0, 6.0] }]; + await sourceTable.add(moreData); + + // Clone the table (should get latest version with 3 rows) + const sourceUri = `${tmpDir.name}/source.lance`; + const clonedTable = await db.cloneTable("cloned", sourceUri); + + // Verify cloned table has all 3 rows + expect(await clonedTable.countRows()).toBe(3); + expect((await db.tableNames()).includes("cloned")).toBe(true); + }); + + it("should clone a table from a specific version", async () => { + // Create source table with initial data + const data = [ + { id: 1, text: "hello", vector: [1.0, 2.0] }, + { id: 2, text: "world", vector: [3.0, 4.0] }, + ]; + const sourceTable = await db.createTable("source", data); + + // Get the initial version + const initialVersion = await sourceTable.version(); + + // Add more data to create a new version + const moreData = [{ id: 3, text: "test", vector: [5.0, 6.0] }]; + await sourceTable.add(moreData); + + // Verify source now has 3 rows + expect(await sourceTable.countRows()).toBe(3); + + // Clone from the initial version (should have only 2 rows) + const sourceUri = `${tmpDir.name}/source.lance`; + const clonedTable = await db.cloneTable("cloned", sourceUri, { + sourceVersion: initialVersion, + }); + + // Verify cloned table has only the initial 2 rows + expect(await clonedTable.countRows()).toBe(2); + }); + + it("should clone a table from a tagged version", async () => { + // Create source table with initial data + const data = [ + { id: 1, text: "hello", vector: [1.0, 2.0] }, + { id: 2, text: "world", vector: [3.0, 4.0] }, + ]; + const sourceTable = await db.createTable("source", data); + + // Create a tag for the current version + const tags = await sourceTable.tags(); + await tags.create("v1.0", await sourceTable.version()); + + // Add more data after the tag + const moreData = [{ id: 3, text: "test", vector: [5.0, 6.0] }]; + await sourceTable.add(moreData); + + // Verify source now has 3 rows + expect(await sourceTable.countRows()).toBe(3); + + // Clone from the tagged version (should have only 2 rows) + const sourceUri = `${tmpDir.name}/source.lance`; + const clonedTable = await db.cloneTable("cloned", sourceUri, { + sourceTag: "v1.0", + }); + + // Verify cloned table has only the tagged version's 2 rows + expect(await clonedTable.countRows()).toBe(2); + }); + + it("should fail when attempting deep clone", async () => { + // Create source table with some data + const data = [ + { id: 1, text: "hello", vector: [1.0, 2.0] }, + { id: 2, text: "world", vector: [3.0, 4.0] }, + ]; + await db.createTable("source", data); + + // Try to create a deep clone (should fail) + const sourceUri = `${tmpDir.name}/source.lance`; + await expect( + db.cloneTable("cloned", sourceUri, { isShallow: false }), + ).rejects.toThrow("Deep clone is not yet implemented"); + }); +}); diff --git a/nodejs/lancedb/connection.ts b/nodejs/lancedb/connection.ts index 8b3aefdd..2d29cf68 100644 --- a/nodejs/lancedb/connection.ts +++ b/nodejs/lancedb/connection.ts @@ -268,6 +268,33 @@ export abstract class Connection { * @param {string[]} namespace The namespace to drop tables from (defaults to root namespace). */ abstract dropAllTables(namespace?: string[]): Promise; + + /** + * Clone a table from a source table. + * + * A shallow clone creates a new table that shares the underlying data files + * with the source table but has its own independent manifest. This allows + * both the source and cloned tables to evolve independently while initially + * sharing the same data, deletion, and index files. + * + * @param {string} targetTableName - The name of the target table to create. + * @param {string} sourceUri - The URI of the source table to clone from. + * @param {object} options - Clone options. + * @param {string[]} options.targetNamespace - The namespace for the target table (defaults to root namespace). + * @param {number} options.sourceVersion - The version of the source table to clone. + * @param {string} options.sourceTag - The tag of the source table to clone. + * @param {boolean} options.isShallow - Whether to perform a shallow clone (defaults to true). + */ + abstract cloneTable( + targetTableName: string, + sourceUri: string, + options?: { + targetNamespace?: string[]; + sourceVersion?: number; + sourceTag?: string; + isShallow?: boolean; + }, + ): Promise; } /** @hideconstructor */ @@ -332,6 +359,28 @@ export class LocalConnection extends Connection { return new LocalTable(innerTable); } + async cloneTable( + targetTableName: string, + sourceUri: string, + options?: { + targetNamespace?: string[]; + sourceVersion?: number; + sourceTag?: string; + isShallow?: boolean; + }, + ): Promise
{ + const innerTable = await this.inner.cloneTable( + targetTableName, + sourceUri, + options?.targetNamespace ?? [], + options?.sourceVersion ?? null, + options?.sourceTag ?? null, + options?.isShallow ?? true, + ); + + return new LocalTable(innerTable); + } + private getStorageOptions( options?: Partial, ): Record | undefined { diff --git a/nodejs/src/connection.rs b/nodejs/src/connection.rs index e3849038..c3c019d7 100644 --- a/nodejs/src/connection.rs +++ b/nodejs/src/connection.rs @@ -213,6 +213,36 @@ impl Connection { Ok(Table::new(tbl)) } + #[napi(catch_unwind)] + pub async fn clone_table( + &self, + target_table_name: String, + source_uri: String, + target_namespace: Vec, + source_version: Option, + source_tag: Option, + is_shallow: bool, + ) -> napi::Result
{ + let mut builder = self + .get_inner()? + .clone_table(&target_table_name, &source_uri); + + builder = builder.target_namespace(target_namespace); + + if let Some(version) = source_version { + builder = builder.source_version(version as u64); + } + + if let Some(tag) = source_tag { + builder = builder.source_tag(tag); + } + + builder = builder.is_shallow(is_shallow); + + let tbl = builder.execute().await.default_error()?; + Ok(Table::new(tbl)) + } + /// Drop table with the name. Or raise an error if the table does not exist. #[napi(catch_unwind)] pub async fn drop_table(&self, name: String, namespace: Vec) -> napi::Result<()> { diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index b6b86af1..5a26c9a9 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -60,6 +60,15 @@ class Connection(object): storage_options: Optional[Dict[str, str]] = None, index_cache_size: Optional[int] = None, ) -> Table: ... + async def clone_table( + self, + target_table_name: str, + source_uri: str, + target_namespace: List[str] = [], + source_version: Optional[int] = None, + source_tag: Optional[str] = None, + is_shallow: bool = True, + ) -> Table: ... async def rename_table( self, cur_name: str, diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index 2de54694..d3aa1fe5 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -665,6 +665,60 @@ class LanceDBConnection(DBConnection): index_cache_size=index_cache_size, ) + def clone_table( + self, + target_table_name: str, + source_uri: str, + *, + target_namespace: List[str] = [], + source_version: Optional[int] = None, + source_tag: Optional[str] = None, + is_shallow: bool = True, + ) -> LanceTable: + """Clone a table from a source table. + + A shallow clone creates a new table that shares the underlying data files + with the source table but has its own independent manifest. This allows + both the source and cloned tables to evolve independently while initially + sharing the same data, deletion, and index files. + + Parameters + ---------- + target_table_name: str + The name of the target table to create. + source_uri: str + The URI of the source table to clone from. + target_namespace: List[str], optional + The namespace for the target table. + None or empty list represents root namespace. + source_version: int, optional + The version of the source table to clone. + source_tag: str, optional + The tag of the source table to clone. + is_shallow: bool, default True + Whether to perform a shallow clone (True) or deep clone (False). + Currently only shallow clone is supported. + + Returns + ------- + A LanceTable object representing the cloned table. + """ + LOOP.run( + self._conn.clone_table( + target_table_name, + source_uri, + target_namespace=target_namespace, + source_version=source_version, + source_tag=source_tag, + is_shallow=is_shallow, + ) + ) + return LanceTable.open( + self, + target_table_name, + namespace=target_namespace, + ) + @override def drop_table( self, @@ -1136,6 +1190,54 @@ class AsyncConnection(object): ) return AsyncTable(table) + async def clone_table( + self, + target_table_name: str, + source_uri: str, + *, + target_namespace: List[str] = [], + source_version: Optional[int] = None, + source_tag: Optional[str] = None, + is_shallow: bool = True, + ) -> AsyncTable: + """Clone a table from a source table. + + A shallow clone creates a new table that shares the underlying data files + with the source table but has its own independent manifest. This allows + both the source and cloned tables to evolve independently while initially + sharing the same data, deletion, and index files. + + Parameters + ---------- + target_table_name: str + The name of the target table to create. + source_uri: str + The URI of the source table to clone from. + target_namespace: List[str], optional + The namespace for the target table. + None or empty list represents root namespace. + source_version: int, optional + The version of the source table to clone. + source_tag: str, optional + The tag of the source table to clone. + is_shallow: bool, default True + Whether to perform a shallow clone (True) or deep clone (False). + Currently only shallow clone is supported. + + Returns + ------- + An AsyncTable object representing the cloned table. + """ + table = await self._inner.clone_table( + target_table_name, + source_uri, + target_namespace=target_namespace, + source_version=source_version, + source_tag=source_tag, + is_shallow=is_shallow, + ) + return AsyncTable(table) + async def rename_table( self, cur_name: str, diff --git a/python/python/lancedb/remote/db.py b/python/python/lancedb/remote/db.py index a337cf51..b839213e 100644 --- a/python/python/lancedb/remote/db.py +++ b/python/python/lancedb/remote/db.py @@ -212,6 +212,53 @@ class RemoteDBConnection(DBConnection): table = LOOP.run(self._conn.open_table(name, namespace=namespace)) return RemoteTable(table, self.db_name) + def clone_table( + self, + target_table_name: str, + source_uri: str, + *, + target_namespace: List[str] = [], + source_version: Optional[int] = None, + source_tag: Optional[str] = None, + is_shallow: bool = True, + ) -> Table: + """Clone a table from a source table. + + Parameters + ---------- + target_table_name: str + The name of the target table to create. + source_uri: str + The URI of the source table to clone from. + target_namespace: List[str], optional + The namespace for the target table. + None or empty list represents root namespace. + source_version: int, optional + The version of the source table to clone. + source_tag: str, optional + The tag of the source table to clone. + is_shallow: bool, default True + Whether to perform a shallow clone (True) or deep clone (False). + Currently only shallow clone is supported. + + Returns + ------- + A RemoteTable object representing the cloned table. + """ + from .table import RemoteTable + + table = LOOP.run( + self._conn.clone_table( + target_table_name, + source_uri, + target_namespace=target_namespace, + source_version=source_version, + source_tag=source_tag, + is_shallow=is_shallow, + ) + ) + return RemoteTable(table, self.db_name) + @override def create_table( self, diff --git a/python/python/tests/test_db.py b/python/python/tests/test_db.py index f6c6fb97..1f0e9d21 100644 --- a/python/python/tests/test_db.py +++ b/python/python/tests/test_db.py @@ -831,3 +831,119 @@ def test_local_table_operations_with_namespace_raise_error(tmp_path): # Test table_names without namespace - should work normally tables_root = list(db.table_names()) assert "test_table" in tables_root + + +def test_clone_table_latest_version(tmp_path): + """Test cloning a table with the latest version (default behavior)""" + import os + + db = lancedb.connect(tmp_path) + + # Create source table with some data + data = [ + {"id": 1, "text": "hello", "vector": [1.0, 2.0]}, + {"id": 2, "text": "world", "vector": [3.0, 4.0]}, + ] + source_table = db.create_table("source", data=data) + + # Add more data to create a new version + more_data = [{"id": 3, "text": "test", "vector": [5.0, 6.0]}] + source_table.add(more_data) + + # Clone the table (should get latest version with 3 rows) + source_uri = os.path.join(tmp_path, "source.lance") + cloned_table = db.clone_table("cloned", source_uri) + + # Verify cloned table has all 3 rows + assert cloned_table.count_rows() == 3 + assert "cloned" in db.table_names() + + # Verify data matches + cloned_data = cloned_table.to_pandas() + assert len(cloned_data) == 3 + assert set(cloned_data["id"].tolist()) == {1, 2, 3} + + +def test_clone_table_specific_version(tmp_path): + """Test cloning a table from a specific version""" + import os + + db = lancedb.connect(tmp_path) + + # Create source table with initial data + data = [ + {"id": 1, "text": "hello", "vector": [1.0, 2.0]}, + {"id": 2, "text": "world", "vector": [3.0, 4.0]}, + ] + source_table = db.create_table("source", data=data) + + # Get the initial version + initial_version = source_table.version + + # Add more data to create a new version + more_data = [{"id": 3, "text": "test", "vector": [5.0, 6.0]}] + source_table.add(more_data) + + # Verify source now has 3 rows + assert source_table.count_rows() == 3 + + # Clone from the initial version (should have only 2 rows) + source_uri = os.path.join(tmp_path, "source.lance") + cloned_table = db.clone_table("cloned", source_uri, source_version=initial_version) + + # Verify cloned table has only the initial 2 rows + assert cloned_table.count_rows() == 2 + cloned_data = cloned_table.to_pandas() + assert set(cloned_data["id"].tolist()) == {1, 2} + + +def test_clone_table_with_tag(tmp_path): + """Test cloning a table from a tagged version""" + import os + + db = lancedb.connect(tmp_path) + + # Create source table with initial data + data = [ + {"id": 1, "text": "hello", "vector": [1.0, 2.0]}, + {"id": 2, "text": "world", "vector": [3.0, 4.0]}, + ] + source_table = db.create_table("source", data=data) + + # Create a tag for the current version + source_table.tags.create("v1.0", source_table.version) + + # Add more data after the tag + more_data = [{"id": 3, "text": "test", "vector": [5.0, 6.0]}] + source_table.add(more_data) + + # Verify source now has 3 rows + assert source_table.count_rows() == 3 + + # Clone from the tagged version (should have only 2 rows) + source_uri = os.path.join(tmp_path, "source.lance") + cloned_table = db.clone_table("cloned", source_uri, source_tag="v1.0") + + # Verify cloned table has only the tagged version's 2 rows + assert cloned_table.count_rows() == 2 + cloned_data = cloned_table.to_pandas() + assert set(cloned_data["id"].tolist()) == {1, 2} + + +def test_clone_table_deep_clone_fails(tmp_path): + """Test that deep clone raises an unsupported error""" + import os + + db = lancedb.connect(tmp_path) + + # Create source table with some data + data = [ + {"id": 1, "text": "hello", "vector": [1.0, 2.0]}, + {"id": 2, "text": "world", "vector": [3.0, 4.0]}, + ] + db.create_table("source", data=data) + + # Try to create a deep clone (should fail) + source_uri = os.path.join(tmp_path, "source.lance") + with pytest.raises(Exception, match="Deep clone is not yet implemented"): + db.clone_table("cloned", source_uri, is_shallow=False) diff --git a/python/src/connection.rs b/python/src/connection.rs index ca426c3b..13782dca 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -163,6 +163,34 @@ impl Connection { }) } + #[pyo3(signature = (target_table_name, source_uri, target_namespace=vec![], source_version=None, source_tag=None, is_shallow=true))] + pub fn clone_table( + self_: PyRef<'_, Self>, + target_table_name: String, + source_uri: String, + target_namespace: Vec, + source_version: Option, + source_tag: Option, + is_shallow: bool, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + + let mut builder = inner.clone_table(target_table_name, source_uri); + builder = builder.target_namespace(target_namespace); + if let Some(version) = source_version { + builder = builder.source_version(version); + } + if let Some(tag) = source_tag { + builder = builder.source_tag(tag); + } + builder = builder.is_shallow(is_shallow); + + future_into_py(self_.py(), async move { + let table = builder.execute().await.infer_error()?; + Ok(Table::new(table)) + }) + } + #[pyo3(signature = (cur_name, new_name, cur_namespace=vec![], new_namespace=vec![]))] pub fn rename_table( self_: PyRef<'_, Self>, diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index a4df666b..41ddad43 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -17,9 +17,9 @@ use crate::database::listing::{ ListingDatabase, OPT_NEW_TABLE_STORAGE_VERSION, OPT_NEW_TABLE_V2_MANIFEST_PATHS, }; use crate::database::{ - CreateNamespaceRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database, - DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest, - TableNamesRequest, + CloneTableRequest, CreateNamespaceRequest, CreateTableData, CreateTableMode, + CreateTableRequest, Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, + OpenTableRequest, TableNamesRequest, }; use crate::embeddings::{ EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry, MemoryRegistry, WithEmbeddings, @@ -469,6 +469,62 @@ impl OpenTableBuilder { } } +/// Builder for cloning a table. +/// +/// A shallow clone creates a new table that shares the underlying data files +/// with the source table but has its own independent manifest. Both the source +/// and cloned tables can evolve independently while initially sharing the same +/// data, deletion, and index files. +/// +/// Use this builder to configure the clone operation before executing it. +pub struct CloneTableBuilder { + parent: Arc, + request: CloneTableRequest, +} + +impl CloneTableBuilder { + fn new(parent: Arc, target_table_name: String, source_uri: String) -> Self { + Self { + parent, + request: CloneTableRequest::new(target_table_name, source_uri), + } + } + + /// Set the source version to clone from + pub fn source_version(mut self, version: u64) -> Self { + self.request.source_version = Some(version); + self + } + + /// Set the source tag to clone from + pub fn source_tag(mut self, tag: impl Into) -> Self { + self.request.source_tag = Some(tag.into()); + self + } + + /// Set the target namespace for the cloned table + pub fn target_namespace(mut self, namespace: Vec) -> Self { + self.request.target_namespace = namespace; + self + } + + /// Set whether to perform a shallow clone (default: true) + /// + /// When true, the cloned table shares data files with the source table. + /// When false, performs a deep clone (not yet implemented). + pub fn is_shallow(mut self, is_shallow: bool) -> Self { + self.request.is_shallow = is_shallow; + self + } + + /// Execute the clone operation + pub async fn execute(self) -> Result
{ + Ok(Table::new( + self.parent.clone().clone_table(self.request).await?, + )) + } +} + /// A connection to LanceDB #[derive(Clone)] pub struct Connection { @@ -575,6 +631,30 @@ impl Connection { ) } + /// Clone a table in the database + /// + /// Creates a new table by cloning from an existing source table. + /// By default, this performs a shallow clone where the new table shares + /// the underlying data files with the source table. + /// + /// # Parameters + /// - `target_table_name`: The name of the new table to create + /// - `source_uri`: The URI of the source table to clone from + /// + /// # Returns + /// A [`CloneTableBuilder`] that can be used to configure the clone operation + pub fn clone_table( + &self, + target_table_name: impl Into, + source_uri: impl Into, + ) -> CloneTableBuilder { + CloneTableBuilder::new( + self.internal.clone(), + target_table_name.into(), + source_uri.into(), + ) + } + /// Rename a table in the database. /// /// This is only supported in LanceDB Cloud. @@ -1281,4 +1361,50 @@ mod tests { .unwrap(); assert_eq!(other_schema, overwritten.schema().await.unwrap()); } + + #[tokio::test] + async fn test_clone_table() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + let db = connect(uri).execute().await.unwrap(); + + // Create a source table with some data + let mut batch_gen = BatchGenerator::new() + .col(Box::new(IncrementingInt32::new().named("id"))) + .col(Box::new(IncrementingInt32::new().named("value"))); + let reader = batch_gen.batches(5, 100); + + let source_table = db + .create_table("source_table", reader) + .execute() + .await + .unwrap(); + + // Get the source table URI + let source_table_path = tmp_dir.path().join("source_table.lance"); + let source_uri = source_table_path.to_str().unwrap(); + + // Clone the table + let cloned_table = db + .clone_table("cloned_table", source_uri) + .execute() + .await + .unwrap(); + + // Verify the cloned table exists + let table_names = db.table_names().execute().await.unwrap(); + assert!(table_names.contains(&"source_table".to_string())); + assert!(table_names.contains(&"cloned_table".to_string())); + + // Verify the cloned table has the same schema + assert_eq!( + source_table.schema().await.unwrap(), + cloned_table.schema().await.unwrap() + ); + + // Verify the cloned table has the same data + let source_count = source_table.count_rows(None).await.unwrap(); + let cloned_count = cloned_table.count_rows(None).await.unwrap(); + assert_eq!(source_count, cloned_count); + } } diff --git a/rust/lancedb/src/database.rs b/rust/lancedb/src/database.rs index fe83d508..2f22ea10 100644 --- a/rust/lancedb/src/database.rs +++ b/rust/lancedb/src/database.rs @@ -176,6 +176,42 @@ impl CreateTableRequest { } } +/// Request to clone a table from a source table. +/// +/// A shallow clone creates a new table that shares the underlying data files +/// with the source table but has its own independent manifest. This allows +/// both the source and cloned tables to evolve independently while initially +/// sharing the same data, deletion, and index files. +#[derive(Clone, Debug)] +pub struct CloneTableRequest { + /// The name of the target table to create + pub target_table_name: String, + /// The namespace for the target table. Empty list represents root namespace. + pub target_namespace: Vec, + /// The URI of the source table to clone from. + pub source_uri: String, + /// Optional version of the source table to clone. + pub source_version: Option, + /// Optional tag of the source table to clone. + pub source_tag: Option, + /// Whether to perform a shallow clone (true) or deep clone (false). Defaults to true. + /// Currently only shallow clone is supported. + pub is_shallow: bool, +} + +impl CloneTableRequest { + pub fn new(target_table_name: String, source_uri: String) -> Self { + Self { + target_table_name, + target_namespace: vec![], + source_uri, + source_version: None, + source_tag: None, + is_shallow: true, + } + } +} + /// The `Database` trait defines the interface for database implementations. /// /// A database is responsible for managing tables and their metadata. @@ -193,6 +229,13 @@ pub trait Database: async fn table_names(&self, request: TableNamesRequest) -> Result>; /// Create a table in the database async fn create_table(&self, request: CreateTableRequest) -> Result>; + /// Clone a table in the database. + /// + /// Creates a shallow clone of the source table, sharing underlying data files + /// but with an independent manifest. Both tables can evolve separately after cloning. + /// + /// See [`CloneTableRequest`] for detailed documentation and examples. + async fn clone_table(&self, request: CloneTableRequest) -> Result>; /// Open a table in the database async fn open_table(&self, request: OpenTableRequest) -> Result>; /// Rename a table in the database diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index 35fd07e3..16b451ce 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -7,7 +7,8 @@ use std::fs::create_dir_all; use std::path::Path; use std::{collections::HashMap, sync::Arc}; -use lance::dataset::{ReadParams, WriteMode}; +use lance::dataset::refs::Ref; +use lance::dataset::{builder::DatasetBuilder, ReadParams, WriteMode}; use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore}; use lance_datafusion::utils::StreamingWriteSource; use lance_encoding::version::LanceFileVersion; @@ -22,8 +23,8 @@ use crate::table::NativeTable; use crate::utils::validate_table_name; use super::{ - BaseTable, CreateNamespaceRequest, CreateTableMode, CreateTableRequest, Database, - DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest, + BaseTable, CloneTableRequest, CreateNamespaceRequest, CreateTableMode, CreateTableRequest, + Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest, TableNamesRequest, }; @@ -684,6 +685,65 @@ impl Database for ListingDatabase { } } + async fn clone_table(&self, request: CloneTableRequest) -> Result> { + if !request.target_namespace.is_empty() { + return Err(Error::NotSupported { + message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(), + }); + } + + // TODO: support deep clone + if !request.is_shallow { + return Err(Error::NotSupported { + message: "Deep clone is not yet implemented".to_string(), + }); + } + + validate_table_name(&request.target_table_name)?; + + let storage_params = ObjectStoreParams { + storage_options: Some(self.storage_options.clone()), + ..Default::default() + }; + let read_params = ReadParams { + store_options: Some(storage_params.clone()), + session: Some(self.session.clone()), + ..Default::default() + }; + + let mut source_dataset = DatasetBuilder::from_uri(&request.source_uri) + .with_read_params(read_params.clone()) + .load() + .await + .map_err(|e| Error::Lance { source: e })?; + + let version_ref = match (request.source_version, request.source_tag) { + (Some(v), None) => Ok(Ref::Version(v)), + (None, Some(tag)) => Ok(Ref::Tag(tag)), + (None, None) => Ok(Ref::Version(source_dataset.version().version)), + _ => Err(Error::InvalidInput { + message: "Cannot specify both source_version and source_tag".to_string(), + }), + }?; + + let target_uri = self.table_uri(&request.target_table_name)?; + source_dataset + .shallow_clone(&target_uri, version_ref, storage_params) + .await + .map_err(|e| Error::Lance { source: e })?; + + let cloned_table = NativeTable::open_with_params( + &target_uri, + &request.target_table_name, + self.store_wrapper.clone(), + None, + self.read_consistency_interval, + ) + .await?; + + Ok(Arc::new(cloned_table)) + } + async fn open_table(&self, mut request: OpenTableRequest) -> Result> { if !request.namespace.is_empty() { return Err(Error::NotSupported { @@ -785,3 +845,694 @@ impl Database for ListingDatabase { self } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::connection::ConnectRequest; + use crate::database::{CreateTableData, CreateTableMode, CreateTableRequest}; + use crate::table::{Table, TableDefinition}; + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use tempfile::tempdir; + + async fn setup_database() -> (tempfile::TempDir, ListingDatabase) { + let tempdir = tempdir().unwrap(); + let uri = tempdir.path().to_str().unwrap(); + + let request = ConnectRequest { + uri: uri.to_string(), + #[cfg(feature = "remote")] + client_config: Default::default(), + options: Default::default(), + read_consistency_interval: None, + session: None, + }; + + let db = ListingDatabase::connect_with_options(&request) + .await + .unwrap(); + + (tempdir, db) + } + + #[tokio::test] + async fn test_clone_table_basic() { + let (_tempdir, db) = setup_database().await; + + // Create a source table with schema + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let source_table = db + .create_table(CreateTableRequest { + name: "source_table".to_string(), + namespace: vec![], + data: CreateTableData::Empty(TableDefinition::new_from_schema(schema.clone())), + mode: CreateTableMode::Create, + write_options: Default::default(), + }) + .await + .unwrap(); + + // Get the source table URI + let source_uri = db.table_uri("source_table").unwrap(); + + // Clone the table + let cloned_table = db + .clone_table(CloneTableRequest { + target_table_name: "cloned_table".to_string(), + target_namespace: vec![], + source_uri: source_uri.clone(), + source_version: None, + source_tag: None, + is_shallow: true, + }) + .await + .unwrap(); + + // Verify both tables exist + let table_names = db.table_names(TableNamesRequest::default()).await.unwrap(); + assert!(table_names.contains(&"source_table".to_string())); + assert!(table_names.contains(&"cloned_table".to_string())); + + // Verify schemas match + assert_eq!( + source_table.schema().await.unwrap(), + cloned_table.schema().await.unwrap() + ); + } + + #[tokio::test] + async fn test_clone_table_with_data() { + let (_tempdir, db) = setup_database().await; + + // Create a source table with actual data + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap(); + + let reader = Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch)], + schema.clone(), + )); + + let source_table = db + .create_table(CreateTableRequest { + name: "source_with_data".to_string(), + namespace: vec![], + data: CreateTableData::Data(reader), + mode: CreateTableMode::Create, + write_options: Default::default(), + }) + .await + .unwrap(); + + let source_uri = db.table_uri("source_with_data").unwrap(); + + // Clone the table + let cloned_table = db + .clone_table(CloneTableRequest { + target_table_name: "cloned_with_data".to_string(), + target_namespace: vec![], + source_uri, + source_version: None, + source_tag: None, + is_shallow: true, + }) + .await + .unwrap(); + + // Verify data counts match + let source_count = source_table.count_rows(None).await.unwrap(); + let cloned_count = cloned_table.count_rows(None).await.unwrap(); + assert_eq!(source_count, cloned_count); + assert_eq!(source_count, 3); + } + + #[tokio::test] + async fn test_clone_table_with_storage_options() { + let tempdir = tempdir().unwrap(); + let uri = tempdir.path().to_str().unwrap(); + + // Create database with storage options + let mut options = HashMap::new(); + options.insert("test_option".to_string(), "test_value".to_string()); + + let request = ConnectRequest { + uri: uri.to_string(), + #[cfg(feature = "remote")] + client_config: Default::default(), + options: options.clone(), + read_consistency_interval: None, + session: None, + }; + + let db = ListingDatabase::connect_with_options(&request) + .await + .unwrap(); + + // Create source table + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + db.create_table(CreateTableRequest { + name: "source".to_string(), + namespace: vec![], + data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)), + mode: CreateTableMode::Create, + write_options: Default::default(), + }) + .await + .unwrap(); + + let source_uri = db.table_uri("source").unwrap(); + + // Clone should work with storage options + let cloned = db + .clone_table(CloneTableRequest { + target_table_name: "cloned".to_string(), + target_namespace: vec![], + source_uri, + source_version: None, + source_tag: None, + is_shallow: true, + }) + .await; + + assert!(cloned.is_ok()); + } + + #[tokio::test] + async fn test_clone_table_deep_not_supported() { + let (_tempdir, db) = setup_database().await; + + // Create a source table + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + db.create_table(CreateTableRequest { + name: "source".to_string(), + namespace: vec![], + data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)), + mode: CreateTableMode::Create, + write_options: Default::default(), + }) + .await + .unwrap(); + + let source_uri = db.table_uri("source").unwrap(); + + // Try deep clone (should fail) + let result = db + .clone_table(CloneTableRequest { + target_table_name: "cloned".to_string(), + target_namespace: vec![], + source_uri, + source_version: None, + source_tag: None, + is_shallow: false, // Request deep clone + }) + .await; + + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + Error::NotSupported { message } if message.contains("Deep clone") + )); + } + + #[tokio::test] + async fn test_clone_table_with_namespace_not_supported() { + let (_tempdir, db) = setup_database().await; + + // Create a source table + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + db.create_table(CreateTableRequest { + name: "source".to_string(), + namespace: vec![], + data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)), + mode: CreateTableMode::Create, + write_options: Default::default(), + }) + .await + .unwrap(); + + let source_uri = db.table_uri("source").unwrap(); + + // Try clone with namespace (should fail for listing database) + let result = db + .clone_table(CloneTableRequest { + target_table_name: "cloned".to_string(), + target_namespace: vec!["namespace".to_string()], // Non-empty namespace + source_uri, + source_version: None, + source_tag: None, + is_shallow: true, + }) + .await; + + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + Error::NotSupported { message } if message.contains("Namespace parameter is not supported") + )); + } + + #[tokio::test] + async fn test_clone_table_invalid_target_name() { + let (_tempdir, db) = setup_database().await; + + // Create a source table + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + db.create_table(CreateTableRequest { + name: "source".to_string(), + namespace: vec![], + data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)), + mode: CreateTableMode::Create, + write_options: Default::default(), + }) + .await + .unwrap(); + + let source_uri = db.table_uri("source").unwrap(); + + // Try clone with invalid target name + let result = db + .clone_table(CloneTableRequest { + target_table_name: "invalid/name".to_string(), // Invalid name with slash + target_namespace: vec![], + source_uri, + source_version: None, + source_tag: None, + is_shallow: true, + }) + .await; + + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_clone_table_source_not_found() { + let (_tempdir, db) = setup_database().await; + + // Try to clone from non-existent source + let result = db + .clone_table(CloneTableRequest { + target_table_name: "cloned".to_string(), + target_namespace: vec![], + source_uri: "/nonexistent/table.lance".to_string(), + source_version: None, + source_tag: None, + is_shallow: true, + }) + .await; + + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_clone_table_with_version_and_tag_error() { + let (_tempdir, db) = setup_database().await; + + // Create a source table + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + db.create_table(CreateTableRequest { + name: "source".to_string(), + namespace: vec![], + data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)), + mode: CreateTableMode::Create, + write_options: Default::default(), + }) + .await + .unwrap(); + + let source_uri = db.table_uri("source").unwrap(); + + // Try clone with both version and tag (should fail) + let result = db + .clone_table(CloneTableRequest { + target_table_name: "cloned".to_string(), + target_namespace: vec![], + source_uri, + source_version: Some(1), + source_tag: Some("v1.0".to_string()), + is_shallow: true, + }) + .await; + + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + Error::InvalidInput { message } if message.contains("Cannot specify both source_version and source_tag") + )); + } + + #[tokio::test] + async fn test_clone_table_with_specific_version() { + let (_tempdir, db) = setup_database().await; + + // Create a source table with initial data + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, false), + ])); + + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["a", "b"])), + ], + ) + .unwrap(); + + let reader = Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch1)], + schema.clone(), + )); + + let source_table = db + .create_table(CreateTableRequest { + name: "versioned_source".to_string(), + namespace: vec![], + data: CreateTableData::Data(reader), + mode: CreateTableMode::Create, + write_options: Default::default(), + }) + .await + .unwrap(); + + // Get the initial version + let initial_version = source_table.version().await.unwrap(); + + // Add more data to create a new version + let batch2 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(StringArray::from(vec!["c", "d"])), + ], + ) + .unwrap(); + + let source_table_obj = Table::new(source_table.clone()); + source_table_obj + .add(Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch2)], + schema.clone(), + ))) + .execute() + .await + .unwrap(); + + // Verify source table now has 4 rows + assert_eq!(source_table.count_rows(None).await.unwrap(), 4); + + let source_uri = db.table_uri("versioned_source").unwrap(); + + // Clone from the initial version (should have only 2 rows) + let cloned_table = db + .clone_table(CloneTableRequest { + target_table_name: "cloned_from_version".to_string(), + target_namespace: vec![], + source_uri, + source_version: Some(initial_version), + source_tag: None, + is_shallow: true, + }) + .await + .unwrap(); + + // Verify cloned table has only the initial 2 rows + assert_eq!(cloned_table.count_rows(None).await.unwrap(), 2); + + // Source table should still have 4 rows + assert_eq!(source_table.count_rows(None).await.unwrap(), 4); + } + + #[tokio::test] + async fn test_clone_table_with_tag() { + let (_tempdir, db) = setup_database().await; + + // Create a source table with initial data + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, false), + ])); + + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["a", "b"])), + ], + ) + .unwrap(); + + let reader = Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch1)], + schema.clone(), + )); + + let source_table = db + .create_table(CreateTableRequest { + name: "tagged_source".to_string(), + namespace: vec![], + data: CreateTableData::Data(reader), + mode: CreateTableMode::Create, + write_options: Default::default(), + }) + .await + .unwrap(); + + // Create a tag for the current version + let source_table_obj = Table::new(source_table.clone()); + let mut tags = source_table_obj.tags().await.unwrap(); + tags.create("v1.0", source_table.version().await.unwrap()) + .await + .unwrap(); + + // Add more data after the tag + let batch2 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(StringArray::from(vec!["c", "d"])), + ], + ) + .unwrap(); + + let source_table_obj = Table::new(source_table.clone()); + source_table_obj + .add(Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch2)], + schema.clone(), + ))) + .execute() + .await + .unwrap(); + + // Source table should have 4 rows + assert_eq!(source_table.count_rows(None).await.unwrap(), 4); + + let source_uri = db.table_uri("tagged_source").unwrap(); + + // Clone from the tag (should have only 2 rows) + let cloned_table = db + .clone_table(CloneTableRequest { + target_table_name: "cloned_from_tag".to_string(), + target_namespace: vec![], + source_uri, + source_version: None, + source_tag: Some("v1.0".to_string()), + is_shallow: true, + }) + .await + .unwrap(); + + // Verify cloned table has only the tagged version's 2 rows + assert_eq!(cloned_table.count_rows(None).await.unwrap(), 2); + } + + #[tokio::test] + async fn test_cloned_tables_evolve_independently() { + let (_tempdir, db) = setup_database().await; + + // Create a source table with initial data + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, false), + ])); + + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["a", "b"])), + ], + ) + .unwrap(); + + let reader = Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch1)], + schema.clone(), + )); + + let source_table = db + .create_table(CreateTableRequest { + name: "independent_source".to_string(), + namespace: vec![], + data: CreateTableData::Data(reader), + mode: CreateTableMode::Create, + write_options: Default::default(), + }) + .await + .unwrap(); + + let source_uri = db.table_uri("independent_source").unwrap(); + + // Clone the table + let cloned_table = db + .clone_table(CloneTableRequest { + target_table_name: "independent_clone".to_string(), + target_namespace: vec![], + source_uri, + source_version: None, + source_tag: None, + is_shallow: true, + }) + .await + .unwrap(); + + // Both should start with 2 rows + assert_eq!(source_table.count_rows(None).await.unwrap(), 2); + assert_eq!(cloned_table.count_rows(None).await.unwrap(), 2); + + // Add data to the cloned table + let batch_clone = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![3, 4, 5])), + Arc::new(StringArray::from(vec!["c", "d", "e"])), + ], + ) + .unwrap(); + + let cloned_table_obj = Table::new(cloned_table.clone()); + cloned_table_obj + .add(Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch_clone)], + schema.clone(), + ))) + .execute() + .await + .unwrap(); + + // Add different data to the source table + let batch_source = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![10, 11])), + Arc::new(StringArray::from(vec!["x", "y"])), + ], + ) + .unwrap(); + + let source_table_obj = Table::new(source_table.clone()); + source_table_obj + .add(Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch_source)], + schema.clone(), + ))) + .execute() + .await + .unwrap(); + + // Verify they have evolved independently + assert_eq!(source_table.count_rows(None).await.unwrap(), 4); // 2 + 2 + assert_eq!(cloned_table.count_rows(None).await.unwrap(), 5); // 2 + 3 + } + + #[tokio::test] + async fn test_clone_latest_version() { + let (_tempdir, db) = setup_database().await; + + // Create a source table with initial data + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let batch1 = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1, 2]))]) + .unwrap(); + + let reader = Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch1)], + schema.clone(), + )); + + let source_table = db + .create_table(CreateTableRequest { + name: "latest_version_source".to_string(), + namespace: vec![], + data: CreateTableData::Data(reader), + mode: CreateTableMode::Create, + write_options: Default::default(), + }) + .await + .unwrap(); + + // Add more data to create new versions + for i in 0..3 { + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![i * 10, i * 10 + 1]))], + ) + .unwrap(); + + let source_table_obj = Table::new(source_table.clone()); + source_table_obj + .add(Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch)], + schema.clone(), + ))) + .execute() + .await + .unwrap(); + } + + // Source should have 8 rows total (2 + 2 + 2 + 2) + let source_count = source_table.count_rows(None).await.unwrap(); + assert_eq!(source_count, 8); + + let source_uri = db.table_uri("latest_version_source").unwrap(); + + // Clone without specifying version or tag (should get latest) + let cloned_table = db + .clone_table(CloneTableRequest { + target_table_name: "cloned_latest".to_string(), + target_namespace: vec![], + source_uri, + source_version: None, + source_tag: None, + is_shallow: true, + }) + .await + .unwrap(); + + // Cloned table should have all 8 rows from the latest version + assert_eq!(cloned_table.count_rows(None).await.unwrap(), 8); + } +} diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index d16226af..41bd6128 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -14,9 +14,9 @@ use serde::Deserialize; use tokio::task::spawn_blocking; use crate::database::{ - CreateNamespaceRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database, - DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest, - TableNamesRequest, + CloneTableRequest, CreateNamespaceRequest, CreateTableData, CreateTableMode, + CreateTableRequest, Database, DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, + OpenTableRequest, TableNamesRequest, }; use crate::error::Result; use crate::table::BaseTable; @@ -27,6 +27,18 @@ use super::table::RemoteTable; use super::util::{batches_to_ipc_bytes, parse_server_version}; use super::ARROW_STREAM_CONTENT_TYPE; +// Request structure for the remote clone table API +#[derive(serde::Serialize)] +struct RemoteCloneTableRequest { + source_location: String, + #[serde(skip_serializing_if = "Option::is_none")] + source_version: Option, + #[serde(skip_serializing_if = "Option::is_none")] + source_tag: Option, + #[serde(skip_serializing_if = "Option::is_none")] + is_shallow: Option, +} + // the versions of the server that we support // for any new feature that we need to change the SDK behavior, we should bump the server version, // and add a feature flag as method of `ServerVersion` here. @@ -430,6 +442,51 @@ impl Database for RemoteDatabase { Ok(table) } + async fn clone_table(&self, request: CloneTableRequest) -> Result> { + let table_identifier = build_table_identifier( + &request.target_table_name, + &request.target_namespace, + &self.client.id_delimiter, + ); + + let remote_request = RemoteCloneTableRequest { + source_location: request.source_uri, + source_version: request.source_version, + source_tag: request.source_tag, + is_shallow: Some(request.is_shallow), + }; + + let req = self + .client + .post(&format!("/v1/table/{}/clone", table_identifier.clone())) + .json(&remote_request); + + let (request_id, rsp) = self.client.send(req).await?; + + let status = rsp.status(); + if status != StatusCode::OK { + let body = rsp.text().await.err_to_http(request_id.clone())?; + return Err(crate::Error::Http { + source: format!("Failed to clone table: {}", body).into(), + request_id, + status_code: Some(status), + }); + } + + let version = parse_server_version(&request_id, &rsp)?; + let cache_key = build_cache_key(&request.target_table_name, &request.target_namespace); + let table = Arc::new(RemoteTable::new( + self.client.clone(), + request.target_table_name.clone(), + request.target_namespace.clone(), + table_identifier, + version, + )); + self.table_cache.insert(cache_key, table.clone()).await; + + Ok(table) + } + async fn open_table(&self, request: OpenTableRequest) -> Result> { let identifier = build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter); @@ -1221,4 +1278,146 @@ mod tests { _ => panic!("Expected Runtime error from header provider"), } } + + #[tokio::test] + async fn test_clone_table() { + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/table/cloned_table/clone"); + assert_eq!( + request.headers().get("Content-Type").unwrap(), + JSON_CONTENT_TYPE + ); + + let body = request.body().unwrap().as_bytes().unwrap(); + let body: serde_json::Value = serde_json::from_slice(body).unwrap(); + assert_eq!(body["source_location"], "s3://bucket/source_table"); + assert_eq!(body["is_shallow"], true); + + http::Response::builder().status(200).body("").unwrap() + }); + + let table = conn + .clone_table("cloned_table", "s3://bucket/source_table") + .execute() + .await + .unwrap(); + assert_eq!(table.name(), "cloned_table"); + } + + #[tokio::test] + async fn test_clone_table_with_version() { + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/table/cloned_table/clone"); + + let body = request.body().unwrap().as_bytes().unwrap(); + let body: serde_json::Value = serde_json::from_slice(body).unwrap(); + assert_eq!(body["source_location"], "s3://bucket/source_table"); + assert_eq!(body["source_version"], 42); + assert_eq!(body["is_shallow"], true); + + http::Response::builder().status(200).body("").unwrap() + }); + + let table = conn + .clone_table("cloned_table", "s3://bucket/source_table") + .source_version(42) + .execute() + .await + .unwrap(); + assert_eq!(table.name(), "cloned_table"); + } + + #[tokio::test] + async fn test_clone_table_with_tag() { + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/table/cloned_table/clone"); + + let body = request.body().unwrap().as_bytes().unwrap(); + let body: serde_json::Value = serde_json::from_slice(body).unwrap(); + assert_eq!(body["source_location"], "s3://bucket/source_table"); + assert_eq!(body["source_tag"], "v1.0"); + assert_eq!(body["is_shallow"], true); + + http::Response::builder().status(200).body("").unwrap() + }); + + let table = conn + .clone_table("cloned_table", "s3://bucket/source_table") + .source_tag("v1.0") + .execute() + .await + .unwrap(); + assert_eq!(table.name(), "cloned_table"); + } + + #[tokio::test] + async fn test_clone_table_deep_clone() { + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/table/cloned_table/clone"); + + let body = request.body().unwrap().as_bytes().unwrap(); + let body: serde_json::Value = serde_json::from_slice(body).unwrap(); + assert_eq!(body["source_location"], "s3://bucket/source_table"); + assert_eq!(body["is_shallow"], false); + + http::Response::builder().status(200).body("").unwrap() + }); + + let table = conn + .clone_table("cloned_table", "s3://bucket/source_table") + .is_shallow(false) + .execute() + .await + .unwrap(); + assert_eq!(table.name(), "cloned_table"); + } + + #[tokio::test] + async fn test_clone_table_with_namespace() { + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/table/ns1$ns2$cloned_table/clone"); + + let body = request.body().unwrap().as_bytes().unwrap(); + let body: serde_json::Value = serde_json::from_slice(body).unwrap(); + assert_eq!(body["source_location"], "s3://bucket/source_table"); + assert_eq!(body["is_shallow"], true); + + http::Response::builder().status(200).body("").unwrap() + }); + + let table = conn + .clone_table("cloned_table", "s3://bucket/source_table") + .target_namespace(vec!["ns1".to_string(), "ns2".to_string()]) + .execute() + .await + .unwrap(); + assert_eq!(table.name(), "cloned_table"); + } + + #[tokio::test] + async fn test_clone_table_error() { + let conn = Connection::new_with_handler(|_| { + http::Response::builder() + .status(500) + .body("Internal server error") + .unwrap() + }); + + let result = conn + .clone_table("cloned_table", "s3://bucket/source_table") + .execute() + .await; + + assert!(result.is_err()); + if let Err(crate::Error::Http { source, .. }) = result { + assert!(source.to_string().contains("Failed to clone table")); + } else { + panic!("Expected HTTP error"); + } + } }