diff --git a/nodejs/__test__/s3_integration.test.ts b/nodejs/__test__/s3_integration.test.ts index ce194202..e4bc4680 100644 --- a/nodejs/__test__/s3_integration.test.ts +++ b/nodejs/__test__/s3_integration.test.ts @@ -175,6 +175,8 @@ maybeDescribe("storage_options", () => { tableNames = await db.tableNames(); expect(tableNames).toEqual([]); + + await db.dropAllTables(); }); it("can configure encryption at connection and table level", async () => { @@ -210,6 +212,8 @@ maybeDescribe("storage_options", () => { await table.add([{ a: 2, b: 3 }]); await bucket.assertAllEncrypted("test/table2.lance", kmsKey.keyId); + + await db.dropAllTables(); }); }); @@ -298,5 +302,32 @@ maybeDescribe("DynamoDB Lock", () => { const rowCount = await table.countRows(); expect(rowCount).toBe(6); + + await db.dropAllTables(); + }); + + it("clears dynamodb state after dropping all tables", async () => { + const uri = `s3+ddb://${bucket.name}/test?ddbTableName=${commitTable.name}`; + const db = await connect(uri, { + storageOptions: CONFIG, + readConsistencyInterval: 0, + }); + + await db.createTable("foo", [{ a: 1, b: 2 }]); + await db.createTable("bar", [{ a: 1, b: 2 }]); + + let tableNames = await db.tableNames(); + expect(tableNames).toEqual(["bar", "foo"]); + + await db.dropAllTables(); + tableNames = await db.tableNames(); + expect(tableNames).toEqual([]); + + // We can create a new table with the same name as the one we dropped. + await db.createTable("foo", [{ a: 1, b: 2 }]); + tableNames = await db.tableNames(); + expect(tableNames).toEqual(["foo"]); + + await db.dropAllTables(); }); }); diff --git a/python/Makefile b/python/Makefile index 8ac38ac4..cb58822f 100644 --- a/python/Makefile +++ b/python/Makefile @@ -34,3 +34,7 @@ doctest: ## Run documentation tests. .PHONY: test test: ## Run tests. pytest python/tests -vv --durations=10 -m "not slow and not s3_test" + +.PHONY: clean +clean: + rm -rf data diff --git a/python/python/tests/test_s3.py b/python/python/tests/test_s3.py index 85040998..3bee14cd 100644 --- a/python/python/tests/test_s3.py +++ b/python/python/tests/test_s3.py @@ -252,3 +252,27 @@ def test_s3_dynamodb_sync(s3_bucket: str, commit_table: str, monkeypatch): db.drop_table("test_ddb_sync") assert db.table_names() == [] db.drop_database() + + +@pytest.mark.s3_test +def test_s3_dynamodb_drop_all_tables(s3_bucket: str, commit_table: str, monkeypatch): + for key, value in CONFIG.items(): + monkeypatch.setenv(key.upper(), value) + + uri = f"s3+ddb://{s3_bucket}/test2?ddbTableName={commit_table}" + db = lancedb.connect(uri, read_consistency_interval=timedelta(0)) + data = pa.table({"x": ["a", "b", "c"]}) + + db.create_table("foo", data) + db.create_table("bar", data) + assert db.table_names() == ["bar", "foo"] + + # dropping all tables should clear multiple tables + db.drop_all_tables() + assert db.table_names() == [] + + # create a new table with the same name to ensure DDB is clean + db.create_table("foo", data) + assert db.table_names() == ["foo"] + + db.drop_all_tables() diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index fa711ca9..c47fd72a 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -322,6 +322,37 @@ impl ListingDatabase { Ok(uri) } + + async fn drop_tables(&self, names: Vec) -> Result<()> { + let object_store_params = ObjectStoreParams { + storage_options: Some(self.storage_options.clone()), + ..Default::default() + }; + let mut uri = self.uri.clone(); + if let Some(query_string) = &self.query_string { + uri.push_str(&format!("?{}", query_string)); + } + let commit_handler = commit_handler_from_url(&uri, &Some(object_store_params)).await?; + for name in names { + let dir_name = format!("{}.{}", name, LANCE_EXTENSION); + let full_path = self.base_path.child(dir_name.clone()); + + commit_handler.delete(&full_path).await?; + + self.object_store + .remove_dir_all(full_path.clone()) + .await + .map_err(|err| match err { + // this error is not lance::Error::DatasetNotFound, as the method + // `remove_dir_all` may be used to remove something not be a dataset + lance::Error::NotFound { .. } => Error::TableNotFound { + name: name.to_owned(), + }, + _ => Error::from(err), + })?; + } + Ok(()) + } } #[async_trait::async_trait] @@ -493,40 +524,12 @@ impl Database for ListingDatabase { } async fn drop_table(&self, name: &str) -> Result<()> { - let dir_name = format!("{}.{}", name, LANCE_EXTENSION); - let full_path = self.base_path.child(dir_name.clone()); - self.object_store - .remove_dir_all(full_path.clone()) - .await - .map_err(|err| match err { - // this error is not lance::Error::DatasetNotFound, - // as the method `remove_dir_all` may be used to remove something not be a dataset - lance::Error::NotFound { .. } => Error::TableNotFound { - name: name.to_owned(), - }, - _ => Error::from(err), - })?; - - let object_store_params = ObjectStoreParams { - storage_options: Some(self.storage_options.clone()), - ..Default::default() - }; - let mut uri = self.uri.clone(); - if let Some(query_string) = &self.query_string { - uri.push_str(&format!("?{}", query_string)); - } - let commit_handler = commit_handler_from_url(&uri, &Some(object_store_params)) - .await - .unwrap(); - commit_handler.delete(&full_path).await.unwrap(); - Ok(()) + self.drop_tables(vec![name.to_string()]).await } async fn drop_all_tables(&self) -> Result<()> { - self.object_store - .remove_dir_all(self.base_path.clone()) - .await?; - Ok(()) + let tables = self.table_names(TableNamesRequest::default()).await?; + self.drop_tables(tables).await } fn as_any(&self) -> &dyn std::any::Any {