mirror of
https://github.com/lancedb/lancedb.git
synced 2026-07-04 11:30:46 +00:00
feat: add table branch support (#3490)
### Description
Adds first-class support for table branches across the Rust core and the
Python and TypeScript SDKs.
Rust
```rust
use lance::dataset::refs::Ref;
// Create a branch from main and write to it — main is untouched.
let exp = table.create_branch("exp", Ref::Version(None, None)).await?;
exp.add(batches).await?;
// Reopen the branch later: check out from a table, or open it directly.
let exp = table.checkout_branch("exp").await?;
let exp = db.open_table("items").branch("exp").execute().await?;
let branches = table.list_branches().await?;
table.delete_branch("exp").await?;
```
Python
```python
# Create a branch from main and write to it
branch = await table.branches.create("exp", from_ref="main")
await branch.add(data)
# Reopen the branch later: check out from a table, or open it directly.
branch = await table.branches.checkout("exp")
branch = await db.open_table("items", branch="exp")
await table.branches.list()
await table.branches.delete("exp")
```
TypeScript
```typescript
const branches = await table.branches();
// Create a branch from main and write to it
const branch = await branches.create("exp");
await branch.add(data);
// Reopen the branch later: check out from a table, or open it directly.
const checkedOut = await branches.checkout("exp");
const opened = await db.openTable("items", undefined, { branch: "exp" });
await branches.list();
await branches.delete("exp");
```
### Testing
- Added unit tests
- ran smoke tests against python and typescript sdks on local machine
### Next steps
- Add RemoteTable support
- Add Branch Comparison support
- Merge Branching support
This commit is contained in:
@@ -119,6 +119,7 @@ pub struct OpenTableBuilder {
|
||||
parent: Arc<dyn Database>,
|
||||
request: OpenTableRequest,
|
||||
embedding_registry: Arc<dyn EmbeddingRegistry>,
|
||||
branch: Option<String>,
|
||||
}
|
||||
|
||||
impl OpenTableBuilder {
|
||||
@@ -139,6 +140,7 @@ impl OpenTableBuilder {
|
||||
managed_versioning: None,
|
||||
},
|
||||
embedding_registry,
|
||||
branch: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -259,14 +261,22 @@ impl OpenTableBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Open the table scoped to the given branch instead of the default branch.
|
||||
///
|
||||
/// Reads and writes on the returned table operate in the branch's context.
|
||||
pub fn branch(mut self, branch: impl Into<String>) -> Self {
|
||||
self.branch = Some(branch.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Open the table
|
||||
pub async fn execute(self) -> Result<Table> {
|
||||
let table = self.parent.open_table(self.request).await?;
|
||||
Ok(Table::new_with_embedding_registry(
|
||||
table,
|
||||
self.parent,
|
||||
self.embedding_registry,
|
||||
))
|
||||
let table = Table::new_with_embedding_registry(table, self.parent, self.embedding_registry);
|
||||
match self.branch {
|
||||
Some(branch) => table.checkout_branch(&branch).await,
|
||||
None => Ok(table),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -740,6 +740,64 @@ mod tests {
|
||||
assert!(table_names.contains(&"test_table".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_branch_query_under_pushdown_stays_local() {
|
||||
// With QueryTable pushdown enabled, a query on the main branch routes to
|
||||
// the namespace server, but a branch handle must run locally: the
|
||||
// server-side request carries no branch and would return main's rows.
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
let conn = connect_namespace("dir", properties)
|
||||
.pushdown_operation(NamespaceClientPushdownOperation::QueryTable)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to connect to namespace");
|
||||
|
||||
conn.create_namespace(CreateNamespaceRequest {
|
||||
id: Some(vec!["test_ns".into()]),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("Failed to create namespace");
|
||||
|
||||
// main has 5 rows
|
||||
let table = conn
|
||||
.create_table("ref_test", create_test_data())
|
||||
.namespace(vec!["test_ns".into()])
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
let main_version = table.version().await.unwrap();
|
||||
|
||||
// fork a branch off main, then add 5 more rows so it differs from main
|
||||
let branch = table
|
||||
.create_branch("exp", main_version)
|
||||
.await
|
||||
.expect("Failed to create branch");
|
||||
branch
|
||||
.add(create_test_data())
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to append to branch");
|
||||
|
||||
// the branch query must run locally and see the branch's 10 rows --
|
||||
// not get routed to the server (which carries no branch) and see main's 5
|
||||
let results = branch
|
||||
.query()
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to query branch")
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.expect("Failed to collect results");
|
||||
let count: usize = results.iter().map(|b| b.num_rows()).sum();
|
||||
assert_eq!(count, 10);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_describe_table() {
|
||||
// Setup: Create a temporary directory for the namespace
|
||||
|
||||
@@ -1384,6 +1384,38 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
.map_err(unwrap_shared_error)
|
||||
}
|
||||
|
||||
async fn create_branch(
|
||||
&self,
|
||||
_name: &str,
|
||||
_from: lance::dataset::refs::Ref,
|
||||
) -> Result<Arc<dyn BaseTable>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "branching is not yet supported on remote tables".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn checkout_branch(&self, _name: &str) -> Result<Arc<dyn BaseTable>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "branching is not yet supported on remote tables".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn list_branches(&self) -> Result<HashMap<String, lance::dataset::refs::BranchContents>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "branching is not yet supported on remote tables".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete_branch(&self, _name: &str) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "branching is not yet supported on remote tables".into(),
|
||||
})
|
||||
}
|
||||
|
||||
fn current_branch(&self) -> Option<String> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
|
||||
let mut request = self.post_read(&format!("/v1/table/{}/count_rows/", self.identifier));
|
||||
|
||||
|
||||
@@ -86,7 +86,7 @@ pub use add_data::{AddDataBuilder, AddDataMode, AddResult, NaNVectorBehavior};
|
||||
pub use chrono::Duration;
|
||||
pub use delete::DeleteResult;
|
||||
use futures::future::join_all;
|
||||
pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
|
||||
pub use lance::dataset::refs::{BranchContents, Ref, TagContents, Tags as LanceTags};
|
||||
pub use lance::dataset::scanner::DatasetRecordBatchStream;
|
||||
use lance::dataset::statistics::DatasetStatisticsExt;
|
||||
pub use lance_index::optimize::OptimizeOptions;
|
||||
@@ -625,6 +625,20 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
async fn restore(&self) -> Result<()>;
|
||||
/// List the versions of the table.
|
||||
async fn list_versions(&self) -> Result<Vec<Version>>;
|
||||
/// Create a new branch from `from` and return a handle scoped to it.
|
||||
async fn create_branch(
|
||||
&self,
|
||||
name: &str,
|
||||
from: lance::dataset::refs::Ref,
|
||||
) -> Result<Arc<dyn BaseTable>>;
|
||||
/// Check out an existing branch and return a handle scoped to it.
|
||||
async fn checkout_branch(&self, name: &str) -> Result<Arc<dyn BaseTable>>;
|
||||
/// List the branches of the table.
|
||||
async fn list_branches(&self) -> Result<HashMap<String, BranchContents>>;
|
||||
/// Delete a branch.
|
||||
async fn delete_branch(&self, name: &str) -> Result<()>;
|
||||
/// The branch this handle is scoped to, or `None` for `main`.
|
||||
fn current_branch(&self) -> Option<String>;
|
||||
/// Get the table definition.
|
||||
async fn table_definition(&self) -> Result<TableDefinition>;
|
||||
/// Get the table URI (storage location)
|
||||
@@ -1625,6 +1639,45 @@ impl Table {
|
||||
self.inner.tags().await
|
||||
}
|
||||
|
||||
/// Create a new branch from `from` (a version, tag, or branch)
|
||||
pub async fn create_branch(
|
||||
&self,
|
||||
name: &str,
|
||||
from: impl Into<lance::dataset::refs::Ref>,
|
||||
) -> Result<Self> {
|
||||
let inner = self.inner.create_branch(name, from.into()).await?;
|
||||
Ok(Self {
|
||||
inner,
|
||||
database: self.database.clone(),
|
||||
embedding_registry: self.embedding_registry.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Check out an existing branch and return a handle scoped to it.
|
||||
pub async fn checkout_branch(&self, name: &str) -> Result<Self> {
|
||||
let inner = self.inner.checkout_branch(name).await?;
|
||||
Ok(Self {
|
||||
inner,
|
||||
database: self.database.clone(),
|
||||
embedding_registry: self.embedding_registry.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// List the branches of the table.
|
||||
pub async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
|
||||
self.inner.list_branches().await
|
||||
}
|
||||
|
||||
/// Delete a branch.
|
||||
pub async fn delete_branch(&self, name: &str) -> Result<()> {
|
||||
self.inner.delete_branch(name).await
|
||||
}
|
||||
|
||||
/// The branch this handle is scoped to, or `None` for `main`.
|
||||
pub fn current_branch(&self) -> Option<String> {
|
||||
self.inner.current_branch()
|
||||
}
|
||||
|
||||
/// Retrieve statistics on the table
|
||||
pub async fn stats(&self) -> Result<TableStatistics> {
|
||||
self.inner.stats().await
|
||||
@@ -1861,6 +1914,30 @@ impl NativeTable {
|
||||
self
|
||||
}
|
||||
|
||||
/// Build a sibling `NativeTable` with the same identity but a different
|
||||
/// (independent) dataset wrapper — used to hand out branch-scoped handles.
|
||||
fn with_dataset(&self, dataset: dataset::DatasetConsistencyWrapper) -> Self {
|
||||
Self {
|
||||
name: self.name.clone(),
|
||||
namespace: self.namespace.clone(),
|
||||
id: self.id.clone(),
|
||||
uri: self.uri.clone(),
|
||||
dataset,
|
||||
read_consistency_interval: self.read_consistency_interval,
|
||||
namespace_client: self.namespace_client.clone(),
|
||||
pushdown_operations: self.pushdown_operations.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_branch_name(name: &str, field: &str) -> Result<()> {
|
||||
if name.is_empty() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: format!("{field} must be a non-empty string"),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Opens an existing Table using a namespace client.
|
||||
///
|
||||
/// This method uses `DatasetBuilder::from_namespace` to open the table, which
|
||||
@@ -2652,6 +2729,49 @@ impl BaseTable for NativeTable {
|
||||
self.dataset.reload().await
|
||||
}
|
||||
|
||||
async fn create_branch(
|
||||
&self,
|
||||
name: &str,
|
||||
from: lance::dataset::refs::Ref,
|
||||
) -> Result<Arc<dyn BaseTable>> {
|
||||
Self::validate_branch_name(name, "branch name")?;
|
||||
if let lance::dataset::refs::Ref::Version(Some(from_branch), _) = &from {
|
||||
Self::validate_branch_name(from_branch, "from_ref")?;
|
||||
}
|
||||
let mut ds = (*self.dataset.get().await?).clone();
|
||||
let branch_ds = ds.create_branch(name, from, None).await?;
|
||||
let dataset = dataset::DatasetConsistencyWrapper::new_latest(
|
||||
branch_ds,
|
||||
self.read_consistency_interval,
|
||||
);
|
||||
Ok(Arc::new(self.with_dataset(dataset)))
|
||||
}
|
||||
|
||||
async fn checkout_branch(&self, name: &str) -> Result<Arc<dyn BaseTable>> {
|
||||
Self::validate_branch_name(name, "branch name")?;
|
||||
let branch_ds = self.dataset.get().await?.checkout_branch(name).await?;
|
||||
let dataset = dataset::DatasetConsistencyWrapper::new_latest(
|
||||
branch_ds,
|
||||
self.read_consistency_interval,
|
||||
);
|
||||
Ok(Arc::new(self.with_dataset(dataset)))
|
||||
}
|
||||
|
||||
async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
|
||||
Ok(self.dataset.get().await?.list_branches().await?)
|
||||
}
|
||||
|
||||
async fn delete_branch(&self, name: &str) -> Result<()> {
|
||||
Self::validate_branch_name(name, "branch name")?;
|
||||
let mut ds = (*self.dataset.get().await?).clone();
|
||||
ds.delete_branch(name).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn current_branch(&self) -> Option<String> {
|
||||
self.dataset.current_branch()
|
||||
}
|
||||
|
||||
async fn list_versions(&self) -> Result<Vec<Version>> {
|
||||
Ok(self.dataset.get().await?.versions().await?)
|
||||
}
|
||||
@@ -3370,6 +3490,171 @@ mod tests {
|
||||
assert_eq!(table.version().await.unwrap(), 4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branches() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// main: one row at v1
|
||||
let table = conn
|
||||
.create_table("my_table", some_sample_data())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 1);
|
||||
assert_eq!(table.current_branch(), None);
|
||||
let main_version = table.version().await.unwrap();
|
||||
|
||||
// branch off main's current version; it starts with main's data
|
||||
let branch = table.create_branch("exp", main_version).await.unwrap();
|
||||
assert_eq!(branch.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(branch.count_rows(None).await.unwrap(), 1);
|
||||
|
||||
// writes on the branch are isolated from main
|
||||
branch.add(some_sample_data()).execute().await.unwrap();
|
||||
assert_eq!(branch.count_rows(None).await.unwrap(), 2);
|
||||
assert_eq!(
|
||||
table.count_rows(None).await.unwrap(),
|
||||
1,
|
||||
"main must be untouched by branch writes"
|
||||
);
|
||||
|
||||
// the branch shows up in the listing
|
||||
let branches = table.list_branches().await.unwrap();
|
||||
assert!(branches.contains_key("exp"));
|
||||
|
||||
// checking out the branch from the main handle sees the branch's latest data
|
||||
let checked_out = table.checkout_branch("exp").await.unwrap();
|
||||
assert_eq!(checked_out.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(checked_out.count_rows(None).await.unwrap(), 2);
|
||||
|
||||
// open_table(...).branch(...) opens directly onto the branch
|
||||
let opened = conn
|
||||
.open_table("my_table")
|
||||
.branch("exp")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(opened.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(opened.count_rows(None).await.unwrap(), 2);
|
||||
|
||||
// delete removes it from the listing
|
||||
table.delete_branch("exp").await.unwrap();
|
||||
let branches = table.list_branches().await.unwrap();
|
||||
assert!(!branches.contains_key("exp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_name_validation() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let conn = ConnectBuilder::new(uri).execute().await.unwrap();
|
||||
let table = conn
|
||||
.create_table("my_table", some_sample_data())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// every entry point rejects an empty name instead of passing it down
|
||||
assert!(matches!(
|
||||
table.create_branch("", 1u64).await,
|
||||
Err(Error::InvalidInput { .. })
|
||||
));
|
||||
assert!(matches!(
|
||||
table.checkout_branch("").await,
|
||||
Err(Error::InvalidInput { .. })
|
||||
));
|
||||
assert!(matches!(
|
||||
table.delete_branch("").await,
|
||||
Err(Error::InvalidInput { .. })
|
||||
));
|
||||
// an empty source branch is rejected too
|
||||
assert!(matches!(
|
||||
table
|
||||
.create_branch(
|
||||
"ok",
|
||||
lance::dataset::refs::Ref::Version(Some(String::new()), None)
|
||||
)
|
||||
.await,
|
||||
Err(Error::InvalidInput { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_handle_tracks_concurrent_writes() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
// interval = 0 so every read checks storage for new commits
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn
|
||||
.create_table("my_table", some_sample_data())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let v1 = table.version().await.unwrap();
|
||||
|
||||
// two independent handles on the same branch
|
||||
let writer = table.create_branch("exp", v1).await.unwrap();
|
||||
let reader = conn
|
||||
.open_table("my_table")
|
||||
.branch("exp")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(reader.count_rows(None).await.unwrap(), 1);
|
||||
|
||||
// a concurrent write on the branch is visible to the other handle, which
|
||||
// tracks the branch's HEAD (not main's)
|
||||
writer.add(some_sample_data()).execute().await.unwrap();
|
||||
assert_eq!(reader.count_rows(None).await.unwrap(), 2);
|
||||
// main is untouched
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_handle_without_consistency_interval_is_pinned() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
// default interval (None): handles do not auto-refresh
|
||||
let conn = ConnectBuilder::new(uri).execute().await.unwrap();
|
||||
let table = conn
|
||||
.create_table("my_table", some_sample_data())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let v1 = table.version().await.unwrap();
|
||||
|
||||
let writer = table.create_branch("exp", v1).await.unwrap();
|
||||
let reader = conn
|
||||
.open_table("my_table")
|
||||
.branch("exp")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(reader.count_rows(None).await.unwrap(), 1);
|
||||
|
||||
// without a consistency interval the reader stays on the version it
|
||||
// opened, exactly like a main-branch handle...
|
||||
writer.add(some_sample_data()).execute().await.unwrap();
|
||||
assert_eq!(reader.count_rows(None).await.unwrap(), 1);
|
||||
|
||||
// ...until it explicitly refreshes
|
||||
reader.checkout_latest().await.unwrap();
|
||||
assert_eq!(reader.count_rows(None).await.unwrap(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_index() {
|
||||
use arrow_array::RecordBatch;
|
||||
|
||||
@@ -144,8 +144,19 @@ impl DatasetConsistencyWrapper {
|
||||
}
|
||||
|
||||
/// Checkout a branch and track its HEAD for new versions.
|
||||
pub async fn as_branch(&self, _branch: impl Into<String>) -> Result<()> {
|
||||
todo!("Branch support not yet implemented")
|
||||
pub async fn as_branch(&self, branch: impl Into<String>) -> Result<()> {
|
||||
let branch = branch.into();
|
||||
let dataset = { self.state.lock()?.dataset.clone() };
|
||||
let new_dataset = dataset.checkout_branch(&branch).await?;
|
||||
|
||||
let mut state = self.state.lock()?;
|
||||
state.dataset = Arc::new(new_dataset);
|
||||
state.pinned_version = None;
|
||||
drop(state);
|
||||
if let ConsistencyMode::Eventual(bg_cache) = &self.consistency {
|
||||
bg_cache.invalidate();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check that the dataset is in a mutable mode (Latest).
|
||||
@@ -161,6 +172,17 @@ impl DatasetConsistencyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
/// The branch this wrapper is currently tracking, or `None` for `main`.
|
||||
pub fn current_branch(&self) -> Option<String> {
|
||||
self.state
|
||||
.lock()
|
||||
.unwrap_or_else(|e| e.into_inner())
|
||||
.dataset
|
||||
.manifest()
|
||||
.branch
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Returns the version, if in time travel mode, or None otherwise.
|
||||
pub fn time_travel_version(&self) -> Option<u64> {
|
||||
self.state
|
||||
@@ -737,4 +759,31 @@ mod tests {
|
||||
let result = wrapper.reload().await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_as_branch_is_writable_and_tracked() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
|
||||
// v1 on main, then shallow-clone a branch off it
|
||||
let mut ds = create_test_dataset(uri).await;
|
||||
let v1 = ds.version().version;
|
||||
ds.create_branch("exp", v1, None).await.unwrap();
|
||||
|
||||
// wrapper starts on main: latest, writable, no branch
|
||||
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||
assert_eq!(wrapper.current_branch(), None);
|
||||
|
||||
// switch to the branch
|
||||
wrapper.as_branch("exp").await.unwrap();
|
||||
assert_eq!(wrapper.current_branch().as_deref(), Some("exp"));
|
||||
|
||||
// a branch is writable (unlike a pinned/time-travel checkout)
|
||||
wrapper.ensure_mutable().unwrap();
|
||||
assert_eq!(wrapper.time_travel_version(), None);
|
||||
|
||||
// get() returns the branch dataset
|
||||
let on_branch = wrapper.get().await.unwrap();
|
||||
assert_eq!(on_branch.manifest().branch.as_deref(), Some("exp"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,11 +41,14 @@ pub async fn execute_query(
|
||||
query: &AnyQuery,
|
||||
options: QueryExecutionOptions,
|
||||
) -> Result<DatasetRecordBatchStream> {
|
||||
// If QueryTable pushdown is enabled and namespace client is configured, use server-side query execution
|
||||
// QueryTable pushdown runs the query server-side, but only on the main
|
||||
// branch: the namespace request carries no branch yet, so a branch handle
|
||||
// must fall through to local execution.
|
||||
if table
|
||||
.pushdown_operations
|
||||
.contains(&NamespaceClientPushdownOperation::QueryTable)
|
||||
&& let Some(ref namespace_client) = table.namespace_client
|
||||
&& table.dataset.current_branch().is_none()
|
||||
{
|
||||
return execute_namespace_query(table, namespace_client.clone(), query, options).await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user