mirror of
https://github.com/lancedb/lancedb.git
synced 2026-01-17 01:02:59 +00:00
fix: delete tables from DDB on drop_all_tables (#2194)
Prior to this commit, issuing drop_all_tables on a listing database with an external manifest store would delete physical tables but leave references behind in the manifest store. The table drop would succeed, but subsequent creation of a table with the same name would fail with a conflict. With this patch, the external manifest store is updated to account for the dropped tables so that dropped table names can be reused.
This commit is contained in:
@@ -175,6 +175,8 @@ maybeDescribe("storage_options", () => {
|
||||
|
||||
tableNames = await db.tableNames();
|
||||
expect(tableNames).toEqual([]);
|
||||
|
||||
await db.dropAllTables();
|
||||
});
|
||||
|
||||
it("can configure encryption at connection and table level", async () => {
|
||||
@@ -210,6 +212,8 @@ maybeDescribe("storage_options", () => {
|
||||
await table.add([{ a: 2, b: 3 }]);
|
||||
|
||||
await bucket.assertAllEncrypted("test/table2.lance", kmsKey.keyId);
|
||||
|
||||
await db.dropAllTables();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -298,5 +302,32 @@ maybeDescribe("DynamoDB Lock", () => {
|
||||
|
||||
const rowCount = await table.countRows();
|
||||
expect(rowCount).toBe(6);
|
||||
|
||||
await db.dropAllTables();
|
||||
});
|
||||
|
||||
it("clears dynamodb state after dropping all tables", async () => {
|
||||
const uri = `s3+ddb://${bucket.name}/test?ddbTableName=${commitTable.name}`;
|
||||
const db = await connect(uri, {
|
||||
storageOptions: CONFIG,
|
||||
readConsistencyInterval: 0,
|
||||
});
|
||||
|
||||
await db.createTable("foo", [{ a: 1, b: 2 }]);
|
||||
await db.createTable("bar", [{ a: 1, b: 2 }]);
|
||||
|
||||
let tableNames = await db.tableNames();
|
||||
expect(tableNames).toEqual(["bar", "foo"]);
|
||||
|
||||
await db.dropAllTables();
|
||||
tableNames = await db.tableNames();
|
||||
expect(tableNames).toEqual([]);
|
||||
|
||||
// We can create a new table with the same name as the one we dropped.
|
||||
await db.createTable("foo", [{ a: 1, b: 2 }]);
|
||||
tableNames = await db.tableNames();
|
||||
expect(tableNames).toEqual(["foo"]);
|
||||
|
||||
await db.dropAllTables();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -34,3 +34,7 @@ doctest: ## Run documentation tests.
|
||||
.PHONY: test
|
||||
test: ## Run tests.
|
||||
pytest python/tests -vv --durations=10 -m "not slow and not s3_test"
|
||||
|
||||
.PHONY: clean
|
||||
clean:
|
||||
rm -rf data
|
||||
|
||||
@@ -252,3 +252,27 @@ def test_s3_dynamodb_sync(s3_bucket: str, commit_table: str, monkeypatch):
|
||||
db.drop_table("test_ddb_sync")
|
||||
assert db.table_names() == []
|
||||
db.drop_database()
|
||||
|
||||
|
||||
@pytest.mark.s3_test
|
||||
def test_s3_dynamodb_drop_all_tables(s3_bucket: str, commit_table: str, monkeypatch):
|
||||
for key, value in CONFIG.items():
|
||||
monkeypatch.setenv(key.upper(), value)
|
||||
|
||||
uri = f"s3+ddb://{s3_bucket}/test2?ddbTableName={commit_table}"
|
||||
db = lancedb.connect(uri, read_consistency_interval=timedelta(0))
|
||||
data = pa.table({"x": ["a", "b", "c"]})
|
||||
|
||||
db.create_table("foo", data)
|
||||
db.create_table("bar", data)
|
||||
assert db.table_names() == ["bar", "foo"]
|
||||
|
||||
# dropping all tables should clear multiple tables
|
||||
db.drop_all_tables()
|
||||
assert db.table_names() == []
|
||||
|
||||
# create a new table with the same name to ensure DDB is clean
|
||||
db.create_table("foo", data)
|
||||
assert db.table_names() == ["foo"]
|
||||
|
||||
db.drop_all_tables()
|
||||
|
||||
@@ -322,6 +322,37 @@ impl ListingDatabase {
|
||||
|
||||
Ok(uri)
|
||||
}
|
||||
|
||||
async fn drop_tables(&self, names: Vec<String>) -> Result<()> {
|
||||
let object_store_params = ObjectStoreParams {
|
||||
storage_options: Some(self.storage_options.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
let mut uri = self.uri.clone();
|
||||
if let Some(query_string) = &self.query_string {
|
||||
uri.push_str(&format!("?{}", query_string));
|
||||
}
|
||||
let commit_handler = commit_handler_from_url(&uri, &Some(object_store_params)).await?;
|
||||
for name in names {
|
||||
let dir_name = format!("{}.{}", name, LANCE_EXTENSION);
|
||||
let full_path = self.base_path.child(dir_name.clone());
|
||||
|
||||
commit_handler.delete(&full_path).await?;
|
||||
|
||||
self.object_store
|
||||
.remove_dir_all(full_path.clone())
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
// this error is not lance::Error::DatasetNotFound, as the method
|
||||
// `remove_dir_all` may be used to remove something not be a dataset
|
||||
lance::Error::NotFound { .. } => Error::TableNotFound {
|
||||
name: name.to_owned(),
|
||||
},
|
||||
_ => Error::from(err),
|
||||
})?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -493,40 +524,12 @@ impl Database for ListingDatabase {
|
||||
}
|
||||
|
||||
async fn drop_table(&self, name: &str) -> Result<()> {
|
||||
let dir_name = format!("{}.{}", name, LANCE_EXTENSION);
|
||||
let full_path = self.base_path.child(dir_name.clone());
|
||||
self.object_store
|
||||
.remove_dir_all(full_path.clone())
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
// this error is not lance::Error::DatasetNotFound,
|
||||
// as the method `remove_dir_all` may be used to remove something not be a dataset
|
||||
lance::Error::NotFound { .. } => Error::TableNotFound {
|
||||
name: name.to_owned(),
|
||||
},
|
||||
_ => Error::from(err),
|
||||
})?;
|
||||
|
||||
let object_store_params = ObjectStoreParams {
|
||||
storage_options: Some(self.storage_options.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
let mut uri = self.uri.clone();
|
||||
if let Some(query_string) = &self.query_string {
|
||||
uri.push_str(&format!("?{}", query_string));
|
||||
}
|
||||
let commit_handler = commit_handler_from_url(&uri, &Some(object_store_params))
|
||||
.await
|
||||
.unwrap();
|
||||
commit_handler.delete(&full_path).await.unwrap();
|
||||
Ok(())
|
||||
self.drop_tables(vec![name.to_string()]).await
|
||||
}
|
||||
|
||||
async fn drop_all_tables(&self) -> Result<()> {
|
||||
self.object_store
|
||||
.remove_dir_all(self.base_path.clone())
|
||||
.await?;
|
||||
Ok(())
|
||||
let tables = self.table_names(TableNamesRequest::default()).await?;
|
||||
self.drop_tables(tables).await
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
|
||||
Reference in New Issue
Block a user