mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-24 22:50:40 +00:00
Compare commits
4 Commits
jack/rust-
...
rpgreen/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1a747260f6 | ||
|
|
a0e2eb0c03 | ||
|
|
e88524ca0e | ||
|
|
f50e2f22f5 |
@@ -373,6 +373,19 @@ def _convert_pyarrow_schema_to_json(schema: pa.Schema) -> JsonArrowSchema:
|
||||
return JsonArrowSchema(fields=fields, metadata=meta)
|
||||
|
||||
|
||||
def _builds_namespace_natively(
|
||||
namespace_client_impl: Optional[str],
|
||||
namespace_client_properties: Optional[Dict[str, str]],
|
||||
) -> bool:
|
||||
"""Whether ``connect_namespace_client`` builds the namespace client natively
|
||||
in Rust (installing the read-freshness context provider) rather than wrapping
|
||||
the pre-built Python client.
|
||||
|
||||
Must mirror Rust ``build_namespace_natively`` in ``python/src/connection.rs``.
|
||||
"""
|
||||
return namespace_client_impl == "rest" and bool(namespace_client_properties)
|
||||
|
||||
|
||||
class LanceNamespaceDBConnection(DBConnection):
|
||||
"""
|
||||
A LanceDB connection that uses a namespace for table management.
|
||||
@@ -432,6 +445,13 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
)
|
||||
self._namespace_client_impl = namespace_client_impl
|
||||
self._namespace_client_properties = namespace_client_properties
|
||||
# When the namespace client is built natively (see Rust
|
||||
# ``build_namespace_natively``), the underlying Rust table performs
|
||||
# QueryTable pushdown through the read-freshness context provider, which
|
||||
# the pure-Python ``query_table`` path bypasses.
|
||||
self._route_pushdown_to_rust = _builds_namespace_natively(
|
||||
namespace_client_impl, namespace_client_properties
|
||||
)
|
||||
self._inner = AsyncConnection(
|
||||
_connect_namespace_client(
|
||||
namespace_client,
|
||||
@@ -543,6 +563,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
namespace_path=namespace_path,
|
||||
namespace_client=self._namespace_client,
|
||||
pushdown_operations=self._namespace_client_pushdown_operations,
|
||||
route_pushdown_to_rust=self._route_pushdown_to_rust,
|
||||
_async=async_table,
|
||||
)
|
||||
|
||||
@@ -580,6 +601,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
namespace_path=namespace_path,
|
||||
namespace_client=self._namespace_client,
|
||||
pushdown_operations=self._namespace_client_pushdown_operations,
|
||||
route_pushdown_to_rust=self._route_pushdown_to_rust,
|
||||
_async=async_table,
|
||||
)
|
||||
if branch is not None:
|
||||
|
||||
@@ -2022,6 +2022,7 @@ class LanceTable(Table):
|
||||
namespace_client: Optional[Any] = None,
|
||||
managed_versioning: Optional[bool] = None,
|
||||
pushdown_operations: Optional[set] = None,
|
||||
route_pushdown_to_rust: bool = False,
|
||||
_async: AsyncTable = None,
|
||||
):
|
||||
if namespace_path is None:
|
||||
@@ -2031,6 +2032,14 @@ class LanceTable(Table):
|
||||
self._location = location # Store location for use in _dataset_path
|
||||
self._namespace_client = namespace_client
|
||||
self._pushdown_operations = pushdown_operations or set()
|
||||
# When the connection built the namespace client natively (e.g. an
|
||||
# enterprise "rest" connection), the underlying Rust table already
|
||||
# executes QueryTable pushdown itself -- and, unlike this Python urllib3
|
||||
# path, it routes through the read-freshness context provider that emits
|
||||
# the ``x-lancedb-min-timestamp`` header. So we must defer pushdown to
|
||||
# Rust instead of calling the Python ``namespace_client.query_table``
|
||||
# directly, or reads silently bypass read-freshness (stale results).
|
||||
self._route_pushdown_to_rust = route_pushdown_to_rust
|
||||
if _async is not None:
|
||||
self._table = _async
|
||||
else:
|
||||
@@ -2241,6 +2250,7 @@ class LanceTable(Table):
|
||||
namespace_path=self._namespace_path,
|
||||
namespace_client=self._namespace_client,
|
||||
pushdown_operations=self._pushdown_operations,
|
||||
route_pushdown_to_rust=self._route_pushdown_to_rust,
|
||||
location=self._location,
|
||||
_async=async_table,
|
||||
)
|
||||
@@ -2391,8 +2401,11 @@ class LanceTable(Table):
|
||||
Returns
|
||||
-------
|
||||
pa.Table"""
|
||||
if _should_push_down_query_table(
|
||||
self._namespace_client, self._pushdown_operations
|
||||
if (
|
||||
_should_push_down_query_table(
|
||||
self._namespace_client, self._pushdown_operations
|
||||
)
|
||||
and not self._route_pushdown_to_rust
|
||||
):
|
||||
return self._execute_query(Query()).read_all()
|
||||
|
||||
@@ -3344,6 +3357,7 @@ class LanceTable(Table):
|
||||
location: Optional[str] = None,
|
||||
namespace_client: Optional[Any] = None,
|
||||
pushdown_operations: Optional[set] = None,
|
||||
route_pushdown_to_rust: bool = False,
|
||||
):
|
||||
"""
|
||||
Create a new table.
|
||||
@@ -3406,6 +3420,7 @@ class LanceTable(Table):
|
||||
self._location = location
|
||||
self._namespace_client = namespace_client
|
||||
self._pushdown_operations = pushdown_operations or set()
|
||||
self._route_pushdown_to_rust = route_pushdown_to_rust
|
||||
|
||||
if data_storage_version is not None:
|
||||
warnings.warn(
|
||||
@@ -3519,6 +3534,7 @@ class LanceTable(Table):
|
||||
_should_push_down_query_table(
|
||||
self._namespace_client, self._pushdown_operations
|
||||
)
|
||||
and not self._route_pushdown_to_rust
|
||||
and self.current_branch() is None
|
||||
):
|
||||
from lancedb.namespace import _execute_server_side_query
|
||||
@@ -4260,6 +4276,7 @@ class AsyncTable:
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
namespace_client: Optional[Any] = None,
|
||||
pushdown_operations: Optional[set] = None,
|
||||
route_pushdown_to_rust: bool = False,
|
||||
):
|
||||
"""Create a new AsyncTable object.
|
||||
|
||||
@@ -4272,6 +4289,9 @@ class AsyncTable:
|
||||
self._namespace_path = namespace_path or []
|
||||
self._namespace_client = namespace_client
|
||||
self._pushdown_operations = pushdown_operations or set()
|
||||
# See LanceTable.__init__: defer QueryTable pushdown to Rust (which emits
|
||||
# the read-freshness header) for natively-built namespace clients.
|
||||
self._route_pushdown_to_rust = route_pushdown_to_rust
|
||||
|
||||
def _set_namespace_context(
|
||||
self,
|
||||
@@ -4279,10 +4299,12 @@ class AsyncTable:
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
namespace_client: Optional[Any] = None,
|
||||
pushdown_operations: Optional[set] = None,
|
||||
route_pushdown_to_rust: bool = False,
|
||||
) -> "AsyncTable":
|
||||
self._namespace_path = namespace_path or []
|
||||
self._namespace_client = namespace_client
|
||||
self._pushdown_operations = pushdown_operations or set()
|
||||
self._route_pushdown_to_rust = route_pushdown_to_rust
|
||||
return self
|
||||
|
||||
def __repr__(self):
|
||||
@@ -4492,8 +4514,11 @@ class AsyncTable:
|
||||
-------
|
||||
pa.Table
|
||||
"""
|
||||
if _should_push_down_query_table(
|
||||
self._namespace_client, self._pushdown_operations
|
||||
if (
|
||||
_should_push_down_query_table(
|
||||
self._namespace_client, self._pushdown_operations
|
||||
)
|
||||
and not self._route_pushdown_to_rust
|
||||
):
|
||||
return (await self._execute_query(Query())).read_all()
|
||||
|
||||
@@ -5177,8 +5202,11 @@ class AsyncTable:
|
||||
batch_size: Optional[int] = None,
|
||||
timeout: Optional[timedelta] = None,
|
||||
) -> pa.RecordBatchReader:
|
||||
if _should_push_down_query_table(
|
||||
self._namespace_client, self._pushdown_operations
|
||||
if (
|
||||
_should_push_down_query_table(
|
||||
self._namespace_client, self._pushdown_operations
|
||||
)
|
||||
and not self._route_pushdown_to_rust
|
||||
):
|
||||
from lancedb.namespace import _execute_server_side_query
|
||||
|
||||
|
||||
@@ -65,6 +65,9 @@ def _namespace_lance_table(namespace_client: _NamespaceClient) -> LanceTable:
|
||||
table._namespace_path = ["geneva"]
|
||||
table._namespace_client = namespace_client
|
||||
table._pushdown_operations = {"QueryTable"}
|
||||
# This test exercises the Python-side pushdown path (non-native client), so
|
||||
# pushdown is not routed to Rust.
|
||||
table._route_pushdown_to_rust = False
|
||||
return table
|
||||
|
||||
|
||||
|
||||
@@ -610,24 +610,38 @@ pub fn connect_namespace_client(
|
||||
namespace_client_impl: Option<String>,
|
||||
namespace_client_properties: Option<HashMap<String, String>>,
|
||||
) -> PyResult<Connection> {
|
||||
let namespace_client = extract_namespace_arc(py, namespace_client)?;
|
||||
let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64);
|
||||
let namespace_client_pushdown_operations =
|
||||
parse_namespace_client_pushdown_operations(namespace_client_pushdown_operations)?;
|
||||
let ns_impl = namespace_client_impl.unwrap_or_else(|| "python".to_string());
|
||||
let ns_properties = namespace_client_properties.unwrap_or_default();
|
||||
let storage_options = storage_options.unwrap_or_default();
|
||||
let session = session.map(|s| s.inner.clone());
|
||||
|
||||
let database = LanceNamespaceDatabase::from_namespace_client(
|
||||
namespace_client,
|
||||
ns_impl,
|
||||
ns_properties,
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
session,
|
||||
namespace_client_pushdown_operations,
|
||||
);
|
||||
// Prefer building the namespace natively from (impl, properties) so the
|
||||
// read-freshness provider installed
|
||||
let database = if build_namespace_natively(namespace_client_impl.as_deref(), &ns_properties) {
|
||||
let ns_impl = namespace_client_impl.expect("impl present per build_namespace_natively");
|
||||
crate::runtime::block_on(LanceNamespaceDatabase::connect(
|
||||
&ns_impl,
|
||||
ns_properties,
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
session,
|
||||
namespace_client_pushdown_operations,
|
||||
))
|
||||
.infer_error()?
|
||||
} else {
|
||||
let namespace_client = extract_namespace_arc(py, namespace_client)?;
|
||||
LanceNamespaceDatabase::from_namespace_client(
|
||||
namespace_client,
|
||||
namespace_client_impl.unwrap_or_else(|| "python".to_string()),
|
||||
ns_properties,
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
session,
|
||||
namespace_client_pushdown_operations,
|
||||
)
|
||||
};
|
||||
|
||||
Ok(Connection::new(LanceConnection::new(
|
||||
Arc::new(database),
|
||||
@@ -635,6 +649,16 @@ pub fn connect_namespace_client(
|
||||
)))
|
||||
}
|
||||
|
||||
/// Whether to build the namespace natively (from impl + properties) instead of
|
||||
/// wrapping a pre-built client. Native construction is required for the
|
||||
/// read-freshness provider to be installed
|
||||
fn build_namespace_natively(
|
||||
namespace_client_impl: Option<&str>,
|
||||
namespace_client_properties: &HashMap<String, String>,
|
||||
) -> bool {
|
||||
matches!(namespace_client_impl, Some("rest")) && !namespace_client_properties.is_empty()
|
||||
}
|
||||
|
||||
#[derive(FromPyObject)]
|
||||
pub struct PyClientConfig {
|
||||
user_agent: String,
|
||||
@@ -733,3 +757,36 @@ impl From<PyClientConfig> for lancedb::remote::ClientConfig {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
|
||||
pairs
|
||||
.iter()
|
||||
.map(|(k, v)| (k.to_string(), v.to_string()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn native_build_only_for_rest_with_properties() {
|
||||
let rest = props(&[("uri", "http://localhost:10024")]);
|
||||
|
||||
// rest + non-empty properties -> build natively (installs the
|
||||
// read-freshness provider so checkout_latest() busts the server cache).
|
||||
assert!(build_namespace_natively(Some("rest"), &rest));
|
||||
|
||||
// dir is local (no server cache) -> wrap the pre-built client unchanged.
|
||||
assert!(!build_namespace_natively(
|
||||
Some("dir"),
|
||||
&props(&[("root", "/tmp")])
|
||||
));
|
||||
|
||||
// No impl: only a pre-built client was handed in -> wrap it as-is.
|
||||
assert!(!build_namespace_natively(None, &rest));
|
||||
|
||||
// rest but no properties: nothing to build a connection from -> wrap.
|
||||
assert!(!build_namespace_natively(Some("rest"), &HashMap::new()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,15 @@ fn get_runtime() -> &'static runtime::Runtime {
|
||||
unsafe { &*new_ptr }
|
||||
}
|
||||
|
||||
/// Block the current thread on a future using the shared runtime.
|
||||
///
|
||||
/// For sync `#[pyfunction]`s that need to drive an async operation (e.g.
|
||||
/// building a namespace client). Must not be called from within the runtime's
|
||||
/// own worker threads.
|
||||
pub fn block_on<F: std::future::Future>(fut: F) -> F::Output {
|
||||
get_runtime().block_on(fut)
|
||||
}
|
||||
|
||||
/// Runs in async-signal context after `fork()` in the child. We can only
|
||||
/// touch atomics here; we deliberately leak the previous runtime because
|
||||
/// dropping a tokio `Runtime` would try to join its (now-dead) worker
|
||||
|
||||
@@ -579,24 +579,45 @@ fn array_to_f32_vec(arr: &Arc<dyn arrow_array::Array>) -> Result<Vec<f32>> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Magic bytes that prefix (and suffix) the Arrow IPC *file* format.
|
||||
const ARROW_IPC_FILE_MAGIC: &[u8] = b"ARROW1";
|
||||
|
||||
/// Parse Arrow IPC response from the namespace server.
|
||||
///
|
||||
/// The server may return either the Arrow IPC *file* format or the *stream*
|
||||
/// format. REST/phalanx returns the file format (it begins with the `ARROW1`
|
||||
/// magic); reading that with a `StreamReader` fails with "failed to fill whole
|
||||
/// buffer". Detect the magic and pick the matching reader so both are handled.
|
||||
async fn parse_arrow_ipc_response(bytes: bytes::Bytes) -> Result<DatasetRecordBatchStream> {
|
||||
use arrow_ipc::reader::StreamReader;
|
||||
use arrow_ipc::reader::{FileReader, StreamReader};
|
||||
use std::io::Cursor;
|
||||
|
||||
let cursor = Cursor::new(bytes);
|
||||
let reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to parse Arrow IPC response: {}", e),
|
||||
})?;
|
||||
|
||||
// Collect all record batches
|
||||
let schema = reader.schema();
|
||||
let batches: Vec<_> = reader
|
||||
.into_iter()
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to read Arrow IPC batches: {}", e),
|
||||
let (schema, batches) = if bytes.starts_with(ARROW_IPC_FILE_MAGIC) {
|
||||
let reader = FileReader::try_new(Cursor::new(bytes), None).map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to parse Arrow IPC file response: {}", e),
|
||||
})?;
|
||||
let schema = reader.schema();
|
||||
let batches = reader
|
||||
.into_iter()
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to read Arrow IPC file batches: {}", e),
|
||||
})?;
|
||||
(schema, batches)
|
||||
} else {
|
||||
let reader =
|
||||
StreamReader::try_new(Cursor::new(bytes), None).map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to parse Arrow IPC response: {}", e),
|
||||
})?;
|
||||
let schema = reader.schema();
|
||||
let batches = reader
|
||||
.into_iter()
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to read Arrow IPC batches: {}", e),
|
||||
})?;
|
||||
(schema, batches)
|
||||
};
|
||||
|
||||
// Create a stream from the batches
|
||||
let stream = futures::stream::iter(batches.into_iter().map(Ok));
|
||||
@@ -624,6 +645,59 @@ mod tests {
|
||||
FixedSizeListArray::try_new_from_values(Float32Array::from(values), dimension).unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_parse_arrow_ipc_response_handles_file_and_stream() {
|
||||
use arrow_array::{Int32Array, RecordBatch};
|
||||
use arrow_ipc::writer::{FileWriter, StreamWriter};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Arrow IPC *file* format -- what REST/phalanx returns. Previously this
|
||||
// failed with "failed to fill whole buffer" because we used a StreamReader.
|
||||
let mut file_buf = Vec::new();
|
||||
{
|
||||
let mut writer = FileWriter::try_new(&mut file_buf, &schema).unwrap();
|
||||
writer.write(&batch).unwrap();
|
||||
writer.finish().unwrap();
|
||||
}
|
||||
assert!(file_buf.starts_with(ARROW_IPC_FILE_MAGIC));
|
||||
let rows: usize = parse_arrow_ipc_response(bytes::Bytes::from(file_buf))
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|b| b.num_rows())
|
||||
.sum();
|
||||
assert_eq!(rows, 3);
|
||||
|
||||
// Arrow IPC *stream* format must still parse.
|
||||
let mut stream_buf = Vec::new();
|
||||
{
|
||||
let mut writer = StreamWriter::try_new(&mut stream_buf, &schema).unwrap();
|
||||
writer.write(&batch).unwrap();
|
||||
writer.finish().unwrap();
|
||||
}
|
||||
assert!(!stream_buf.starts_with(ARROW_IPC_FILE_MAGIC));
|
||||
let rows: usize = parse_arrow_ipc_response(bytes::Bytes::from(stream_buf))
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|b| b.num_rows())
|
||||
.sum();
|
||||
assert_eq!(rows, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_convert_to_namespace_query_vector() {
|
||||
let query_vector = Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0]));
|
||||
|
||||
Reference in New Issue
Block a user