mirror of
https://github.com/lancedb/lancedb.git
synced 2026-07-03 02:50:41 +00:00
fix: ensure read freshness provider is built into namespace client
This commit is contained in:
@@ -610,24 +610,38 @@ pub fn connect_namespace_client(
|
||||
namespace_client_impl: Option<String>,
|
||||
namespace_client_properties: Option<HashMap<String, String>>,
|
||||
) -> PyResult<Connection> {
|
||||
let namespace_client = extract_namespace_arc(py, namespace_client)?;
|
||||
let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64);
|
||||
let namespace_client_pushdown_operations =
|
||||
parse_namespace_client_pushdown_operations(namespace_client_pushdown_operations)?;
|
||||
let ns_impl = namespace_client_impl.unwrap_or_else(|| "python".to_string());
|
||||
let ns_properties = namespace_client_properties.unwrap_or_default();
|
||||
let storage_options = storage_options.unwrap_or_default();
|
||||
let session = session.map(|s| s.inner.clone());
|
||||
|
||||
let database = LanceNamespaceDatabase::from_namespace_client(
|
||||
namespace_client,
|
||||
ns_impl,
|
||||
ns_properties,
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
session,
|
||||
namespace_client_pushdown_operations,
|
||||
);
|
||||
// Prefer building the namespace natively from (impl, properties) so the
|
||||
// read-freshness provider installed
|
||||
let database = if build_namespace_natively(namespace_client_impl.as_deref(), &ns_properties) {
|
||||
let ns_impl = namespace_client_impl.expect("impl present per build_namespace_natively");
|
||||
crate::runtime::block_on(LanceNamespaceDatabase::connect(
|
||||
&ns_impl,
|
||||
ns_properties,
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
session,
|
||||
namespace_client_pushdown_operations,
|
||||
))
|
||||
.infer_error()?
|
||||
} else {
|
||||
let namespace_client = extract_namespace_arc(py, namespace_client)?;
|
||||
LanceNamespaceDatabase::from_namespace_client(
|
||||
namespace_client,
|
||||
namespace_client_impl.unwrap_or_else(|| "python".to_string()),
|
||||
ns_properties,
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
session,
|
||||
namespace_client_pushdown_operations,
|
||||
)
|
||||
};
|
||||
|
||||
Ok(Connection::new(LanceConnection::new(
|
||||
Arc::new(database),
|
||||
@@ -635,6 +649,16 @@ pub fn connect_namespace_client(
|
||||
)))
|
||||
}
|
||||
|
||||
/// Whether to build the namespace natively (from impl + properties) instead of
|
||||
/// wrapping a pre-built client. Native construction is required for the
|
||||
/// read-freshness provider to be installed
|
||||
fn build_namespace_natively(
|
||||
namespace_client_impl: Option<&str>,
|
||||
namespace_client_properties: &HashMap<String, String>,
|
||||
) -> bool {
|
||||
namespace_client_impl.is_some() && !namespace_client_properties.is_empty()
|
||||
}
|
||||
|
||||
#[derive(FromPyObject)]
|
||||
pub struct PyClientConfig {
|
||||
user_agent: String,
|
||||
@@ -733,3 +757,34 @@ impl From<PyClientConfig> for lancedb::remote::ClientConfig {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
|
||||
pairs
|
||||
.iter()
|
||||
.map(|(k, v)| (k.to_string(), v.to_string()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn native_build_requires_impl_and_properties() {
|
||||
let rest = props(&[("uri", "http://localhost:10024")]);
|
||||
|
||||
// impl + non-empty properties -> build natively (installs the
|
||||
// read-freshness provider so checkout_latest() busts server caches).
|
||||
assert!(build_namespace_natively(Some("rest"), &rest));
|
||||
assert!(build_namespace_natively(
|
||||
Some("dir"),
|
||||
&props(&[("root", "/tmp")])
|
||||
));
|
||||
|
||||
// No impl: only a pre-built client was handed in -> wrap it as-is.
|
||||
assert!(!build_namespace_natively(None, &rest));
|
||||
|
||||
// Impl but no properties: nothing to build a connection from -> wrap.
|
||||
assert!(!build_namespace_natively(Some("rest"), &HashMap::new()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,15 @@ fn get_runtime() -> &'static runtime::Runtime {
|
||||
unsafe { &*new_ptr }
|
||||
}
|
||||
|
||||
/// Block the current thread on a future using the shared runtime.
|
||||
///
|
||||
/// For sync `#[pyfunction]`s that need to drive an async operation (e.g.
|
||||
/// building a namespace client). Must not be called from within the runtime's
|
||||
/// own worker threads.
|
||||
pub fn block_on<F: std::future::Future>(fut: F) -> F::Output {
|
||||
get_runtime().block_on(fut)
|
||||
}
|
||||
|
||||
/// Runs in async-signal context after `fork()` in the child. We can only
|
||||
/// touch atomics here; we deliberately leak the previous runtime because
|
||||
/// dropping a tokio `Runtime` would try to join its (now-dead) worker
|
||||
|
||||
Reference in New Issue
Block a user