feat(rust): add namespace client properties

This commit is contained in:
Jack Ye
2026-04-15 22:58:05 -07:00
parent c6e43b545d
commit a3e3c1ad5c
3 changed files with 213 additions and 14 deletions

View File

@@ -582,6 +582,14 @@ pub struct ConnectRequest {
/// Database specific options
pub options: HashMap<String, String>,
/// Extra properties for the equivalent namespace client.
///
/// For a local [`ListingDatabase`], these are merged into the backing
/// `DirectoryNamespace` properties. This is useful for namespace-specific
/// settings such as `table_version_tracking_enabled` that are distinct from
/// storage options.
pub namespace_client_properties: HashMap<String, String>,
/// The interval at which to check for updates from other processes.
///
/// If None, then consistency is not checked. For performance
@@ -621,6 +629,7 @@ impl ConnectBuilder {
client_config: Default::default(),
read_consistency_interval: None,
options: HashMap::new(),
namespace_client_properties: HashMap::new(),
session: None,
},
embedding_registry: None,
@@ -757,6 +766,31 @@ impl ConnectBuilder {
self
}
/// Set an additional property for the equivalent namespace client.
pub fn namespace_client_property(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.request
.namespace_client_properties
.insert(key.into(), value.into());
self
}
/// Set multiple additional properties for the equivalent namespace client.
pub fn namespace_client_properties(
mut self,
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
for (key, value) in pairs {
self.request
.namespace_client_properties
.insert(key.into(), value.into());
}
self
}
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
@@ -893,6 +927,7 @@ pub struct ConnectNamespaceBuilder {
ns_impl: String,
properties: HashMap<String, String>,
storage_options: HashMap<String, String>,
namespace_client_properties: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
session: Option<Arc<lance::session::Session>>,
@@ -905,6 +940,7 @@ impl ConnectNamespaceBuilder {
ns_impl: ns_impl.to_string(),
properties,
storage_options: HashMap::new(),
namespace_client_properties: HashMap::new(),
read_consistency_interval: None,
embedding_registry: None,
session: None,
@@ -933,6 +969,29 @@ impl ConnectNamespaceBuilder {
self
}
/// Set an additional namespace client property.
pub fn namespace_client_property(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.namespace_client_properties
.insert(key.into(), value.into());
self
}
/// Set multiple additional namespace client properties.
pub fn namespace_client_properties(
mut self,
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
for (key, value) in pairs {
self.namespace_client_properties
.insert(key.into(), value.into());
}
self
}
/// The interval at which to check for updates from other processes.
///
/// If left unset, consistency is not checked. For maximum read
@@ -994,10 +1053,13 @@ impl ConnectNamespaceBuilder {
pub async fn execute(self) -> Result<Connection> {
use crate::database::namespace::LanceNamespaceDatabase;
let mut properties = self.properties;
properties.extend(self.namespace_client_properties);
let internal = Arc::new(
LanceNamespaceDatabase::connect(
&self.ns_impl,
self.properties,
properties,
self.storage_options,
self.read_consistency_interval,
self.session,
@@ -1117,6 +1179,31 @@ mod tests {
assert_eq!(db.uri(), relative_uri.to_str().unwrap().to_string());
}
#[tokio::test]
async fn test_connect_with_namespace_client_properties() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri)
.namespace_client_property("table_version_tracking_enabled", "true")
.namespace_client_property("manifest_enabled", "true")
.execute()
.await
.unwrap();
let (ns_impl, properties) = db.namespace_client_config().await.unwrap();
assert_eq!(ns_impl, "dir");
assert_eq!(properties.get("root"), Some(&uri.to_string()));
assert_eq!(
properties.get("table_version_tracking_enabled"),
Some(&"true".to_string())
);
assert_eq!(
properties.get("manifest_enabled"),
Some(&"true".to_string())
);
}
#[tokio::test]
async fn test_table_names() {
let tc = new_test_connection().await.unwrap();

View File

@@ -285,11 +285,12 @@ const MIRRORED_STORE: &str = "mirroredStore";
/// A connection to LanceDB
impl ListingDatabase {
fn namespace_client_properties(
fn build_namespace_client_properties(
uri: &str,
storage_options: &HashMap<String, String>,
namespace_client_properties: HashMap<String, String>,
) -> HashMap<String, String> {
let mut properties = HashMap::new();
let mut properties = namespace_client_properties;
properties.insert("root".to_string(), uri.to_string());
for (key, value) in storage_options {
properties.insert(format!("storage.{}", key), value.clone());
@@ -300,10 +301,15 @@ impl ListingDatabase {
async fn connect_namespace_database(
uri: &str,
storage_options: HashMap<String, String>,
namespace_client_properties: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
session: Arc<lance::session::Session>,
) -> Result<Arc<LanceNamespaceDatabase>> {
let ns_properties = Self::namespace_client_properties(uri, &storage_options);
let ns_properties = Self::build_namespace_client_properties(
uri,
&storage_options,
namespace_client_properties,
);
Ok(Arc::new(
LanceNamespaceDatabase::connect(
"dir",
@@ -336,6 +342,7 @@ impl ListingDatabase {
uri,
request.read_consistency_interval,
options.new_table_config,
request.namespace_client_properties.clone(),
request.session.clone(),
)
.await
@@ -426,6 +433,7 @@ impl ListingDatabase {
let namespace_database = Self::connect_namespace_database(
&table_base_uri,
options.storage_options.clone(),
request.namespace_client_properties.clone(),
request.read_consistency_interval,
session.clone(),
)
@@ -450,6 +458,7 @@ impl ListingDatabase {
uri,
request.read_consistency_interval,
options.new_table_config,
request.namespace_client_properties.clone(),
request.session.clone(),
)
.await
@@ -461,6 +470,7 @@ impl ListingDatabase {
path: &str,
read_consistency_interval: Option<std::time::Duration>,
new_table_config: NewTableConfig,
namespace_client_properties: HashMap<String, String>,
session: Option<Arc<lance::session::Session>>,
) -> Result<Self> {
let session = session.unwrap_or_else(|| Arc::new(lance::session::Session::default()));
@@ -477,6 +487,7 @@ impl ListingDatabase {
let namespace_database = Self::connect_namespace_database(
path,
HashMap::new(),
namespace_client_properties,
read_consistency_interval,
session.clone(),
)
@@ -791,18 +802,10 @@ impl Database for ListingDatabase {
self.namespace_database().describe_namespace(request).await
}
#[allow(deprecated)]
async fn table_names(&self, request: TableNamesRequest) -> Result<Vec<String>> {
if !request.namespace_path.is_empty() {
let response = self
.namespace_database()
.list_tables(ListTablesRequest {
id: Some(request.namespace_path),
page_token: request.start_after,
limit: request.limit.map(|l| l as i32),
..Default::default()
})
.await?;
return Ok(response.tables);
return self.namespace_database().table_names(request).await;
}
let mut f = self
.object_store
@@ -1154,6 +1157,7 @@ mod tests {
#[cfg(feature = "remote")]
client_config: Default::default(),
options: Default::default(),
namespace_client_properties: Default::default(),
read_consistency_interval: None,
session: None,
};
@@ -1287,6 +1291,7 @@ mod tests {
#[cfg(feature = "remote")]
client_config: Default::default(),
options: options.clone(),
namespace_client_properties: Default::default(),
read_consistency_interval: None,
session: None,
};
@@ -1821,6 +1826,7 @@ mod tests {
#[cfg(feature = "remote")]
client_config: Default::default(),
options,
namespace_client_properties: Default::default(),
read_consistency_interval: None,
session: None,
};
@@ -1926,6 +1932,7 @@ mod tests {
#[cfg(feature = "remote")]
client_config: Default::default(),
options,
namespace_client_properties: Default::default(),
read_consistency_interval: None,
session: None,
};
@@ -1997,6 +2004,7 @@ mod tests {
#[cfg(feature = "remote")]
client_config: Default::default(),
options,
namespace_client_properties: Default::default(),
read_consistency_interval: None,
session: None,
};
@@ -2175,6 +2183,69 @@ mod tests {
.unwrap();
}
#[tokio::test]
async fn test_listing_database_with_namespace_client_properties() {
let tempdir = tempdir().unwrap();
let uri = tempdir.path().to_str().unwrap();
let mut namespace_client_properties = HashMap::new();
namespace_client_properties.insert(
"table_version_tracking_enabled".to_string(),
"true".to_string(),
);
namespace_client_properties.insert("manifest_enabled".to_string(), "true".to_string());
let request = ConnectRequest {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
options: Default::default(),
namespace_client_properties,
read_consistency_interval: None,
session: None,
};
let db = ListingDatabase::connect_with_options(&request)
.await
.unwrap();
let namespace_path = vec!["test_ns".to_string()];
db.create_namespace(CreateNamespaceRequest {
id: Some(namespace_path.clone()),
..Default::default()
})
.await
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
db.create_table(CreateTableRequest {
name: "managed_table".to_string(),
namespace_path: namespace_path.clone(),
data: Box::new(RecordBatch::new_empty(schema)) as Box<dyn Scannable>,
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
namespace_client: None,
})
.await
.unwrap();
let namespace_client = db.namespace_client().await.unwrap();
let describe = namespace_client
.describe_table(lance_namespace::models::DescribeTableRequest {
id: Some(vec!["test_ns".to_string(), "managed_table".to_string()]),
..Default::default()
})
.await
.unwrap();
assert_eq!(describe.managed_versioning, Some(true));
}
#[tokio::test]
async fn test_listing_database_nested_namespace_table_ops() {
let (_tempdir, db) = setup_database().await;

View File

@@ -450,6 +450,47 @@ mod tests {
));
}
#[tokio::test]
async fn test_namespace_connection_with_namespace_client_properties() {
let tmp_dir = tempdir().unwrap();
let root_path = tmp_dir.path().to_str().unwrap().to_string();
let mut properties = HashMap::new();
properties.insert("root".to_string(), root_path);
let conn = connect_namespace("dir", properties)
.namespace_client_property("table_version_tracking_enabled", "true")
.namespace_client_property("manifest_enabled", "true")
.execute()
.await
.expect("Failed to connect to namespace");
conn.create_namespace(CreateNamespaceRequest {
id: Some(vec!["test_ns".into()]),
..Default::default()
})
.await
.expect("Failed to create namespace");
let test_data = create_test_data();
conn.create_table("test_table", test_data)
.namespace(vec!["test_ns".into()])
.execute()
.await
.expect("Failed to create table");
let namespace_client = conn.namespace_client().await.unwrap();
let describe = namespace_client
.describe_table(DescribeTableRequest {
id: Some(vec!["test_ns".into(), "test_table".into()]),
..Default::default()
})
.await
.expect("Failed to describe table");
assert_eq!(describe.managed_versioning, Some(true));
}
#[tokio::test]
async fn test_namespace_create_table_basic() {
// Setup: Create a temporary directory for the namespace