Compare commits

...

1 Commits

Author SHA1 Message Date
Jack Ye
333b0c7c7c fix(rust): create tables after namespace declare conflict 2026-04-19 10:12:59 -07:00

View File

@@ -239,35 +239,62 @@ impl Database for LanceNamespaceDatabase {
.filter(|o| !o.is_empty()); .filter(|o| !o.is_empty());
(loc, opts, response.managed_versioning) (loc, opts, response.managed_versioning)
} else { } else {
let response = self match self.namespace.declare_table(declare_request).await {
.namespace Ok(response) => {
.declare_table(declare_request) let loc = response.location.ok_or_else(|| Error::Runtime {
.await message: "Table location is missing from declare_table response"
.map_err(|e| { .to_string(),
let err_str = e.to_string(); })?;
if matches!(request.mode, CreateTableMode::Create) let opts = response
&& (err_str.contains("already exists") .storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o: &HashMap<String, String>| !o.is_empty());
(loc, opts, response.managed_versioning)
}
Err(e)
if matches!(request.mode, CreateTableMode::Create) && {
let err_str = e.to_string();
err_str.contains("already exists")
|| err_str.contains("TableAlreadyExists") || err_str.contains("TableAlreadyExists")
|| err_str.contains("table already exists")) || err_str.contains("table already exists")
{ } =>
Error::TableAlreadyExists { {
let response = self
.namespace
.describe_table(DescribeTableRequest {
id: Some(table_id.clone()),
..Default::default()
})
.await
.map_err(|describe_err| Error::Runtime {
message: format!(
"Failed to describe existing declared table after declare conflict: {}",
describe_err
),
})?;
if response.version.is_some() && response.schema.is_some() {
return Err(Error::TableAlreadyExists {
name: request.name.clone(), name: request.name.clone(),
} });
} else {
Error::Runtime {
message: format!("Failed to declare table: {}", e),
}
} }
})?;
let loc = response.location.ok_or_else(|| Error::Runtime { let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(), message: "Table location is missing from describe_table response"
})?; .to_string(),
// Use storage options from response, fall back to self.storage_options })?;
let opts = response let opts = response
.storage_options .storage_options
.or_else(|| Some(self.storage_options.clone())) .or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty()); .filter(|o: &HashMap<String, String>| !o.is_empty());
(loc, opts, response.managed_versioning) (loc, opts, response.managed_versioning)
}
Err(e) => {
return Err(Error::Runtime {
message: format!("Failed to declare table: {}", e),
});
}
}
} }
}; };
@@ -734,6 +761,58 @@ mod tests {
assert_eq!(id_col.value(2), 30); assert_eq!(id_col.value(2), 30);
} }
#[tokio::test]
async fn test_namespace_create_table_after_declare_conflict() {
let tmp_dir = tempdir().unwrap();
let root_path = tmp_dir.path().to_str().unwrap().to_string();
let mut properties = HashMap::new();
properties.insert("root".to_string(), root_path);
let conn = connect_namespace("dir", properties)
.execute()
.await
.expect("Failed to connect to namespace");
conn.create_namespace(CreateNamespaceRequest {
id: Some(vec!["test_ns".into()]),
..Default::default()
})
.await
.expect("Failed to create namespace");
let namespace_client = conn.namespace_client().await.unwrap();
namespace_client
.declare_table(DeclareTableRequest {
id: Some(vec!["test_ns".into(), "declared_test".into()]),
..Default::default()
})
.await
.expect("Failed to declare table");
let test_data = create_test_data();
let table = conn
.create_table("declared_test", test_data)
.namespace(vec!["test_ns".into()])
.execute()
.await
.expect("Failed to create table after declare conflict");
let results = table
.query()
.execute()
.await
.expect("Failed to query table")
.try_collect::<Vec<_>>()
.await
.expect("Failed to collect results");
assert_eq!(results.len(), 1);
assert_eq!(results[0].num_rows(), 5);
assert_eq!(table.namespace(), &["test_ns"]);
assert_eq!(table.id(), "test_ns$declared_test");
}
#[tokio::test] #[tokio::test]
async fn test_namespace_create_table_exist_ok_mode() { async fn test_namespace_create_table_exist_ok_mode() {
// Setup: Create a temporary directory for the namespace // Setup: Create a temporary directory for the namespace