mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-03 04:10:41 +00:00
Compare commits
7 Commits
yang/fix-q
...
feat/table
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a7a7350eb3 | ||
|
|
c3c2887c02 | ||
|
|
2ca6d41f17 | ||
|
|
341cb04c2f | ||
|
|
0d4cb346f9 | ||
|
|
379684391e | ||
|
|
d065be0474 |
43
docs/src/js/classes/BranchContents.md
Normal file
43
docs/src/js/classes/BranchContents.md
Normal file
@@ -0,0 +1,43 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / BranchContents
|
||||
|
||||
# Class: BranchContents
|
||||
|
||||
## Constructors
|
||||
|
||||
### new BranchContents()
|
||||
|
||||
```ts
|
||||
new BranchContents(): BranchContents
|
||||
```
|
||||
|
||||
#### Returns
|
||||
|
||||
[`BranchContents`](BranchContents.md)
|
||||
|
||||
## Properties
|
||||
|
||||
### manifestSize
|
||||
|
||||
```ts
|
||||
manifestSize: number;
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### parentBranch?
|
||||
|
||||
```ts
|
||||
optional parentBranch: string;
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### parentVersion
|
||||
|
||||
```ts
|
||||
parentVersion: number;
|
||||
```
|
||||
90
docs/src/js/classes/Branches.md
Normal file
90
docs/src/js/classes/Branches.md
Normal file
@@ -0,0 +1,90 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / Branches
|
||||
|
||||
# Class: Branches
|
||||
|
||||
Branch manager for a [Table](Table.md).
|
||||
|
||||
Unlike tags, `create` and `checkout` return a new [Table](Table.md) handle scoped
|
||||
to the branch; writes on it do not affect `main`.
|
||||
|
||||
## Methods
|
||||
|
||||
### checkout()
|
||||
|
||||
```ts
|
||||
checkout(name): Promise<Table>
|
||||
```
|
||||
|
||||
Check out an existing branch and return a handle scoped to it.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **name**: `string`
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`Table`](Table.md)>
|
||||
|
||||
***
|
||||
|
||||
### create()
|
||||
|
||||
```ts
|
||||
create(
|
||||
name,
|
||||
fromRef?,
|
||||
fromVersion?): Promise<Table>
|
||||
```
|
||||
|
||||
Create a branch and return a handle scoped to it.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **name**: `string`
|
||||
Name of the new branch.
|
||||
|
||||
* **fromRef?**: `string`
|
||||
Source branch to fork from. Defaults to `main`.
|
||||
|
||||
* **fromVersion?**: `number`
|
||||
A specific version on `fromRef`. Defaults to latest.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`Table`](Table.md)>
|
||||
|
||||
***
|
||||
|
||||
### delete()
|
||||
|
||||
```ts
|
||||
delete(name): Promise<void>
|
||||
```
|
||||
|
||||
Delete a branch.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **name**: `string`
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
***
|
||||
|
||||
### list()
|
||||
|
||||
```ts
|
||||
list(): Promise<Record<string, BranchContents>>
|
||||
```
|
||||
|
||||
List all branches, mapping name to branch metadata.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`Record`<`string`, [`BranchContents`](BranchContents.md)>>
|
||||
@@ -110,6 +110,23 @@ containing the new version number of the table after altering the columns.
|
||||
|
||||
***
|
||||
|
||||
### branches()
|
||||
|
||||
```ts
|
||||
abstract branches(): Promise<Branches>
|
||||
```
|
||||
|
||||
Get the branch manager for this table.
|
||||
|
||||
Branches are isolated, writable lines of history forked from another
|
||||
branch (or version). Writes on a branch do not affect `main`.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`Branches`](Branches.md)>
|
||||
|
||||
***
|
||||
|
||||
### checkout()
|
||||
|
||||
```ts
|
||||
@@ -994,6 +1011,29 @@ based on the row being updated (e.g. "my_col + 1")
|
||||
|
||||
***
|
||||
|
||||
### updateFieldMetadata()
|
||||
|
||||
```ts
|
||||
abstract updateFieldMetadata(updates): Promise<UpdateFieldMetadataResult>
|
||||
```
|
||||
|
||||
Update per-field (column) metadata.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **updates**: [`FieldMetadataUpdate`](../interfaces/FieldMetadataUpdate.md)[]
|
||||
One or more per-field updates. Each
|
||||
update's metadata is merged into the field's existing metadata by default;
|
||||
a value of `null` deletes that key, and `replace: true` swaps the whole map.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`UpdateFieldMetadataResult`](../interfaces/UpdateFieldMetadataResult.md)>
|
||||
|
||||
resolves to the new table version.
|
||||
|
||||
***
|
||||
|
||||
### vectorSearch()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -19,6 +19,8 @@
|
||||
|
||||
- [BooleanQuery](classes/BooleanQuery.md)
|
||||
- [BoostQuery](classes/BoostQuery.md)
|
||||
- [BranchContents](classes/BranchContents.md)
|
||||
- [Branches](classes/Branches.md)
|
||||
- [Connection](classes/Connection.md)
|
||||
- [HeaderProvider](classes/HeaderProvider.md)
|
||||
- [Index](classes/Index.md)
|
||||
@@ -65,6 +67,7 @@
|
||||
- [DropNamespaceOptions](interfaces/DropNamespaceOptions.md)
|
||||
- [DropNamespaceResponse](interfaces/DropNamespaceResponse.md)
|
||||
- [ExecutableQuery](interfaces/ExecutableQuery.md)
|
||||
- [FieldMetadataUpdate](interfaces/FieldMetadataUpdate.md)
|
||||
- [FragmentStatistics](interfaces/FragmentStatistics.md)
|
||||
- [FragmentSummaryStats](interfaces/FragmentSummaryStats.md)
|
||||
- [FtsOptions](interfaces/FtsOptions.md)
|
||||
@@ -101,6 +104,7 @@
|
||||
- [TimeoutConfig](interfaces/TimeoutConfig.md)
|
||||
- [TlsConfig](interfaces/TlsConfig.md)
|
||||
- [TokenResponse](interfaces/TokenResponse.md)
|
||||
- [UpdateFieldMetadataResult](interfaces/UpdateFieldMetadataResult.md)
|
||||
- [UpdateOptions](interfaces/UpdateOptions.md)
|
||||
- [UpdateResult](interfaces/UpdateResult.md)
|
||||
- [Version](interfaces/Version.md)
|
||||
|
||||
41
docs/src/js/interfaces/FieldMetadataUpdate.md
Normal file
41
docs/src/js/interfaces/FieldMetadataUpdate.md
Normal file
@@ -0,0 +1,41 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / FieldMetadataUpdate
|
||||
|
||||
# Interface: FieldMetadataUpdate
|
||||
|
||||
A per-field metadata update, addressed by dot-path.
|
||||
|
||||
## Properties
|
||||
|
||||
### metadata
|
||||
|
||||
```ts
|
||||
metadata: Record<string, null | string>;
|
||||
```
|
||||
|
||||
Metadata key/value pairs. Merged into the field's existing metadata by
|
||||
default; a value of `null` deletes that key.
|
||||
|
||||
***
|
||||
|
||||
### path
|
||||
|
||||
```ts
|
||||
path: string;
|
||||
```
|
||||
|
||||
Dot-separated path to the field. For a top-level column this is just its
|
||||
name; for a nested field it's the path, e.g. "a.b.c".
|
||||
|
||||
***
|
||||
|
||||
### replace?
|
||||
|
||||
```ts
|
||||
optional replace: boolean;
|
||||
```
|
||||
|
||||
If true, replace the field's entire metadata map instead of merging.
|
||||
15
docs/src/js/interfaces/UpdateFieldMetadataResult.md
Normal file
15
docs/src/js/interfaces/UpdateFieldMetadataResult.md
Normal file
@@ -0,0 +1,15 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / UpdateFieldMetadataResult
|
||||
|
||||
# Interface: UpdateFieldMetadataResult
|
||||
|
||||
## Properties
|
||||
|
||||
### version
|
||||
|
||||
```ts
|
||||
version: number;
|
||||
```
|
||||
@@ -85,6 +85,39 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
|
||||
await expect(table.countRows()).resolves.toBe(3);
|
||||
});
|
||||
|
||||
it("should support branches", async () => {
|
||||
await table.add([{ id: 1 }]);
|
||||
expect(await table.countRows()).toBe(1);
|
||||
|
||||
// fork an isolated, writable branch from main
|
||||
const branch = await (await table.branches()).create("exp");
|
||||
expect(await branch.countRows()).toBe(1);
|
||||
await branch.add([{ id: 2 }]);
|
||||
expect(await branch.countRows()).toBe(2);
|
||||
// main is untouched by branch writes
|
||||
expect(await table.countRows()).toBe(1);
|
||||
|
||||
// listed, with main (null) as the parent
|
||||
const list = await (await table.branches()).list();
|
||||
expect(Object.keys(list)).toContain("exp");
|
||||
expect(list["exp"].parentBranch).toBeNull();
|
||||
|
||||
// fromRef="main" is equivalent to the default
|
||||
await (await table.branches()).create("exp2", "main");
|
||||
const list2 = await (await table.branches()).list();
|
||||
expect(list2["exp2"].parentBranch).toBeNull();
|
||||
|
||||
// checkout returns a handle scoped to the branch's latest
|
||||
const checkedOut = await (await table.branches()).checkout("exp");
|
||||
expect(await checkedOut.countRows()).toBe(2);
|
||||
|
||||
// delete removes it
|
||||
await (await table.branches()).delete("exp");
|
||||
await (await table.branches()).delete("exp2");
|
||||
const after = await (await table.branches()).list();
|
||||
expect(Object.keys(after)).not.toContain("exp");
|
||||
});
|
||||
|
||||
it("should show table stats", async () => {
|
||||
await table.add([{ id: 1 }, { id: 2 }]);
|
||||
await table.add([{ id: 1 }]);
|
||||
@@ -1571,6 +1604,33 @@ describe("schema evolution", function () {
|
||||
expect(await table.schema()).toEqual(expectedSchema3);
|
||||
});
|
||||
|
||||
it("can update field metadata", async function () {
|
||||
const con = await connect(tmpDir.name);
|
||||
const table = await con.createTable("fm", [
|
||||
{ id: 1, category: "a" },
|
||||
{ id: 2, category: "b" },
|
||||
]);
|
||||
|
||||
const res = await table.updateFieldMetadata([
|
||||
{ path: "category", metadata: { unit: "label", pii: "false" } },
|
||||
]);
|
||||
expect(res).toHaveProperty("version");
|
||||
expect(res.version).toBe(2);
|
||||
|
||||
let cat = (await table.schema()).fields.find((f) => f.name === "category");
|
||||
expect(cat?.metadata.get("unit")).toBe("label");
|
||||
expect(cat?.metadata.get("pii")).toBe("false");
|
||||
|
||||
// merge: add a key, delete one via null, keep the rest
|
||||
await table.updateFieldMetadata([
|
||||
{ path: "category", metadata: { source: "import", pii: null } },
|
||||
]);
|
||||
cat = (await table.schema()).fields.find((f) => f.name === "category");
|
||||
expect(cat?.metadata.get("unit")).toBe("label"); // preserved
|
||||
expect(cat?.metadata.get("source")).toBe("import"); // added
|
||||
expect(cat?.metadata.has("pii")).toBe(false); // deleted
|
||||
});
|
||||
|
||||
it("can cast to various types", async function () {
|
||||
const con = await connect(tmpDir.name);
|
||||
|
||||
|
||||
@@ -38,10 +38,12 @@ export {
|
||||
FragmentSummaryStats,
|
||||
Tags,
|
||||
TagContents,
|
||||
BranchContents,
|
||||
MergeResult,
|
||||
AddResult,
|
||||
AddColumnsResult,
|
||||
AlterColumnsResult,
|
||||
UpdateFieldMetadataResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
UpdateResult,
|
||||
@@ -110,6 +112,7 @@ export {
|
||||
|
||||
export {
|
||||
Table,
|
||||
Branches,
|
||||
AddDataOptions,
|
||||
UpdateOptions,
|
||||
OptimizeOptions,
|
||||
@@ -117,6 +120,7 @@ export {
|
||||
WriteProgress,
|
||||
LsmWriteSpec,
|
||||
ColumnAlteration,
|
||||
FieldMetadataUpdate,
|
||||
} from "./table";
|
||||
|
||||
export {
|
||||
|
||||
@@ -25,13 +25,16 @@ import {
|
||||
AddColumnsSql,
|
||||
AddResult,
|
||||
AlterColumnsResult,
|
||||
BranchContents,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
IndexConfig,
|
||||
IndexStatistics,
|
||||
Branches as NativeBranches,
|
||||
OptimizeStats,
|
||||
TableStatistics,
|
||||
Tags,
|
||||
UpdateFieldMetadataResult,
|
||||
UpdateResult,
|
||||
Table as _NativeTable,
|
||||
} from "./native";
|
||||
@@ -508,6 +511,18 @@ export abstract class Table {
|
||||
abstract alterColumns(
|
||||
columnAlterations: ColumnAlteration[],
|
||||
): Promise<AlterColumnsResult>;
|
||||
|
||||
/**
|
||||
* Update per-field (column) metadata.
|
||||
* @param {FieldMetadataUpdate[]} updates One or more per-field updates. Each
|
||||
* update's metadata is merged into the field's existing metadata by default;
|
||||
* a value of `null` deletes that key, and `replace: true` swaps the whole map.
|
||||
* @returns {Promise<UpdateFieldMetadataResult>} resolves to the new table version.
|
||||
*/
|
||||
abstract updateFieldMetadata(
|
||||
updates: FieldMetadataUpdate[],
|
||||
): Promise<UpdateFieldMetadataResult>;
|
||||
|
||||
/**
|
||||
* Drop one or more columns from the dataset
|
||||
*
|
||||
@@ -640,6 +655,14 @@ export abstract class Table {
|
||||
*/
|
||||
abstract tags(): Promise<Tags>;
|
||||
|
||||
/**
|
||||
* Get the branch manager for this table.
|
||||
*
|
||||
* Branches are isolated, writable lines of history forked from another
|
||||
* branch (or version). Writes on a branch do not affect `main`.
|
||||
*/
|
||||
abstract branches(): Promise<Branches>;
|
||||
|
||||
/**
|
||||
* Restore the table to the currently checked out version
|
||||
*
|
||||
@@ -1037,6 +1060,12 @@ export class LocalTable extends Table {
|
||||
return await this.inner.alterColumns(processedAlterations);
|
||||
}
|
||||
|
||||
async updateFieldMetadata(
|
||||
updates: FieldMetadataUpdate[],
|
||||
): Promise<UpdateFieldMetadataResult> {
|
||||
return await this.inner.updateFieldMetadata(updates);
|
||||
}
|
||||
|
||||
async dropColumns(columnNames: string[]): Promise<DropColumnsResult> {
|
||||
return await this.inner.dropColumns(columnNames);
|
||||
}
|
||||
@@ -1089,6 +1118,10 @@ export class LocalTable extends Table {
|
||||
return await this.inner.tags();
|
||||
}
|
||||
|
||||
async branches(): Promise<Branches> {
|
||||
return new Branches(await this.inner.branches());
|
||||
}
|
||||
|
||||
async optimize(options?: Partial<OptimizeOptions>): Promise<OptimizeStats> {
|
||||
let cleanupOlderThanMs;
|
||||
if (
|
||||
@@ -1203,3 +1236,67 @@ export interface ColumnAlteration {
|
||||
/** Set the new nullability. Note that a nullable column cannot be made non-nullable. */
|
||||
nullable?: boolean;
|
||||
}
|
||||
|
||||
/** A per-field metadata update, addressed by dot-path. */
|
||||
export interface FieldMetadataUpdate {
|
||||
/**
|
||||
* Dot-separated path to the field. For a top-level column this is just its
|
||||
* name; for a nested field it's the path, e.g. "a.b.c".
|
||||
*/
|
||||
path: string;
|
||||
/**
|
||||
* Metadata key/value pairs. Merged into the field's existing metadata by
|
||||
* default; a value of `null` deletes that key.
|
||||
*/
|
||||
metadata: Record<string, string | null>;
|
||||
/** If true, replace the field's entire metadata map instead of merging. */
|
||||
replace?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Branch manager for a {@link Table}.
|
||||
*
|
||||
* Unlike tags, `create` and `checkout` return a new {@link Table} handle scoped
|
||||
* to the branch; writes on it do not affect `main`.
|
||||
*/
|
||||
export class Branches {
|
||||
#inner: NativeBranches;
|
||||
|
||||
/**
|
||||
* Construct a Branches manager. Internal use only.
|
||||
* @hidden
|
||||
*/
|
||||
constructor(inner: NativeBranches) {
|
||||
this.#inner = inner;
|
||||
}
|
||||
|
||||
/** List all branches, mapping name to branch metadata. */
|
||||
async list(): Promise<Record<string, BranchContents>> {
|
||||
return await this.#inner.list();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a branch and return a handle scoped to it.
|
||||
*
|
||||
* @param name Name of the new branch.
|
||||
* @param fromRef Source branch to fork from. Defaults to `main`.
|
||||
* @param fromVersion A specific version on `fromRef`. Defaults to latest.
|
||||
*/
|
||||
async create(
|
||||
name: string,
|
||||
fromRef?: string,
|
||||
fromVersion?: number,
|
||||
): Promise<Table> {
|
||||
return new LocalTable(await this.#inner.create(name, fromRef, fromVersion));
|
||||
}
|
||||
|
||||
/** Check out an existing branch and return a handle scoped to it. */
|
||||
async checkout(name: string): Promise<Table> {
|
||||
return new LocalTable(await this.#inner.checkout(name));
|
||||
}
|
||||
|
||||
/** Delete a branch. */
|
||||
async delete(name: string): Promise<void> {
|
||||
return await this.#inner.delete(name);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,9 @@ use std::collections::HashMap;
|
||||
|
||||
use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema};
|
||||
use lancedb::table::{
|
||||
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform,
|
||||
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
|
||||
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration,
|
||||
FieldMetadataUpdate as LanceFieldMetadataUpdate, NewColumnTransform, OptimizeAction,
|
||||
OptimizeOptions, Ref, Table as LanceDbTable,
|
||||
};
|
||||
use napi::bindgen_prelude::*;
|
||||
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
|
||||
@@ -355,6 +356,23 @@ impl Table {
|
||||
Ok(res.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn update_field_metadata(
|
||||
&self,
|
||||
updates: Vec<FieldMetadataUpdate>,
|
||||
) -> napi::Result<UpdateFieldMetadataResult> {
|
||||
let updates = updates
|
||||
.into_iter()
|
||||
.map(LanceFieldMetadataUpdate::from)
|
||||
.collect::<Vec<_>>();
|
||||
let res = self
|
||||
.inner_ref()?
|
||||
.update_field_metadata(&updates)
|
||||
.await
|
||||
.default_error()?;
|
||||
Ok(res.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn drop_columns(&self, columns: Vec<String>) -> napi::Result<DropColumnsResult> {
|
||||
let col_refs = columns.iter().map(String::as_str).collect::<Vec<_>>();
|
||||
@@ -460,6 +478,13 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn branches(&self) -> napi::Result<Branches> {
|
||||
Ok(Branches {
|
||||
inner: self.inner_ref()?.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn optimize(
|
||||
&self,
|
||||
@@ -747,6 +772,29 @@ pub struct ColumnAlteration {
|
||||
pub nullable: Option<bool>,
|
||||
}
|
||||
|
||||
/// A per-field metadata update, addressed by dot-path. Merges into the field's
|
||||
/// existing metadata by default; a `null` value deletes a key, and `replace`
|
||||
/// swaps the field's entire metadata map.
|
||||
#[napi(object)]
|
||||
pub struct FieldMetadataUpdate {
|
||||
/// Dot-separated path to the field (e.g. "embedding" or "a.b.c").
|
||||
pub path: String,
|
||||
/// Metadata keys to set; a `null` value deletes that key.
|
||||
pub metadata: HashMap<String, Option<String>>,
|
||||
/// If true, replace the field's entire metadata map instead of merging.
|
||||
pub replace: Option<bool>,
|
||||
}
|
||||
|
||||
impl From<FieldMetadataUpdate> for LanceFieldMetadataUpdate {
|
||||
fn from(js: FieldMetadataUpdate) -> Self {
|
||||
Self {
|
||||
path: js.path,
|
||||
metadata: js.metadata,
|
||||
replace: js.replace.unwrap_or(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ColumnAlteration> for LanceColumnAlteration {
|
||||
type Error = String;
|
||||
fn try_from(js: ColumnAlteration) -> std::result::Result<Self, Self::Error> {
|
||||
@@ -987,6 +1035,19 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct UpdateFieldMetadataResult {
|
||||
pub version: i64,
|
||||
}
|
||||
|
||||
impl From<lancedb::table::UpdateFieldMetadataResult> for UpdateFieldMetadataResult {
|
||||
fn from(value: lancedb::table::UpdateFieldMetadataResult) -> Self {
|
||||
Self {
|
||||
version: value.version as i64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct DropColumnsResult {
|
||||
pub version: i64,
|
||||
@@ -1006,6 +1067,13 @@ pub struct TagContents {
|
||||
pub manifest_size: i64,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub struct BranchContents {
|
||||
pub parent_branch: Option<String>,
|
||||
pub parent_version: i64,
|
||||
pub manifest_size: i64,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub struct Tags {
|
||||
inner: LanceDbTable,
|
||||
@@ -1074,3 +1142,60 @@ impl Tags {
|
||||
.default_error()
|
||||
}
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub struct Branches {
|
||||
inner: LanceDbTable,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
impl Branches {
|
||||
#[napi]
|
||||
pub async fn list(&self) -> napi::Result<HashMap<String, BranchContents>> {
|
||||
let branches = self.inner.list_branches().await.default_error()?;
|
||||
let result = branches
|
||||
.into_iter()
|
||||
.map(|(k, v)| {
|
||||
(
|
||||
k,
|
||||
BranchContents {
|
||||
parent_branch: v.parent_branch,
|
||||
parent_version: v.parent_version as i64,
|
||||
manifest_size: v.manifest_size as i64,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async fn create(
|
||||
&self,
|
||||
name: String,
|
||||
from_ref: Option<String>,
|
||||
from_version: Option<i64>,
|
||||
) -> napi::Result<Table> {
|
||||
// "main" and None are two spellings of the root branch; normalize so
|
||||
// from_ref = "main" behaves identically to the default.
|
||||
let from_ref = from_ref.filter(|b| b != "main");
|
||||
let from = Ref::Version(from_ref, from_version.map(|v| v as u64));
|
||||
let table = self
|
||||
.inner
|
||||
.create_branch(&name, from)
|
||||
.await
|
||||
.default_error()?;
|
||||
Ok(Table::new(table))
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async fn checkout(&self, name: String) -> napi::Result<Table> {
|
||||
let table = self.inner.checkout_branch(&name).await.default_error()?;
|
||||
Ok(Table::new(table))
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async fn delete(&self, name: String) -> napi::Result<()> {
|
||||
self.inner.delete_branch(&name).await.default_error()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,6 +208,9 @@ class Table:
|
||||
async def alter_columns(
|
||||
self, columns: list[dict[str, Any]]
|
||||
) -> AlterColumnsResult: ...
|
||||
async def update_field_metadata(
|
||||
self, updates: list[dict[str, Any]]
|
||||
) -> UpdateFieldMetadataResult: ...
|
||||
async def optimize(
|
||||
self,
|
||||
*,
|
||||
@@ -223,6 +226,8 @@ class Table:
|
||||
async def close_lsm_writers(self) -> None: ...
|
||||
@property
|
||||
def tags(self) -> Tags: ...
|
||||
@property
|
||||
def branches(self) -> Branches: ...
|
||||
def query(self) -> Query: ...
|
||||
def take_offsets(self, offsets: list[int]) -> TakeQuery: ...
|
||||
def take_row_ids(self, row_ids: list[int]) -> TakeQuery: ...
|
||||
@@ -235,6 +240,17 @@ class Tags:
|
||||
async def delete(self, tag: str): ...
|
||||
async def update(self, tag: str, version: int): ...
|
||||
|
||||
class Branches:
|
||||
async def list(self) -> Dict[str, Any]: ...
|
||||
async def create(
|
||||
self,
|
||||
name: str,
|
||||
from_ref: Optional[str] = None,
|
||||
from_version: Optional[int] = None,
|
||||
) -> Table: ...
|
||||
async def checkout(self, name: str) -> Table: ...
|
||||
async def delete(self, name: str) -> None: ...
|
||||
|
||||
class IndexConfig:
|
||||
name: str
|
||||
index_type: str
|
||||
@@ -460,6 +476,9 @@ class AddColumnsResult:
|
||||
class AlterColumnsResult:
|
||||
version: int
|
||||
|
||||
class UpdateFieldMetadataResult:
|
||||
version: int
|
||||
|
||||
class DropColumnsResult:
|
||||
version: int
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ from lancedb._lancedb import (
|
||||
AddColumnsResult,
|
||||
AddResult,
|
||||
AlterColumnsResult,
|
||||
UpdateFieldMetadataResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
IndexConfig,
|
||||
@@ -850,6 +851,11 @@ class RemoteTable(Table):
|
||||
) -> AlterColumnsResult:
|
||||
return LOOP.run(self._table.alter_columns(*alterations))
|
||||
|
||||
def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
return LOOP.run(self._table.update_field_metadata(*updates))
|
||||
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
|
||||
@@ -154,6 +154,7 @@ if TYPE_CHECKING:
|
||||
AddColumnsResult,
|
||||
AddResult,
|
||||
AlterColumnsResult,
|
||||
UpdateFieldMetadataResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
LsmWriteSpec,
|
||||
@@ -757,6 +758,15 @@ class Table(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def branches(self) -> "Branches":
|
||||
"""Branch management for the table.
|
||||
|
||||
Branches are isolated, writable lines of history forked from another
|
||||
branch (or version). Writes on a branch do not affect ``main``.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""The number of rows in this Table"""
|
||||
return self.count_rows(None)
|
||||
@@ -1799,6 +1809,29 @@ class Table(ABC):
|
||||
version: the new version number of the table after the alteration.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
"""
|
||||
Update per-field (column) metadata.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
updates : dict
|
||||
One or more dicts, each with:
|
||||
- "path": str — dot-path to the field (e.g. "embedding" or "a.b.c").
|
||||
- "metadata": dict[str, str | None] — keys to set; a value of ``None``
|
||||
deletes that key.
|
||||
- "replace": bool, optional — replace the field's whole metadata map
|
||||
instead of merging (default False).
|
||||
|
||||
Returns
|
||||
-------
|
||||
UpdateFieldMetadataResult
|
||||
version: the new table version after the update.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
"""
|
||||
@@ -2143,6 +2176,15 @@ class LanceTable(Table):
|
||||
"""
|
||||
return Tags(self._table)
|
||||
|
||||
@property
|
||||
def branches(self) -> "Branches":
|
||||
"""Branch management for the table.
|
||||
|
||||
``create``/``checkout`` return a new table handle scoped to the branch;
|
||||
writes on it do not affect ``main``.
|
||||
"""
|
||||
return Branches(self._table)
|
||||
|
||||
def checkout(self, version: Union[int, str]):
|
||||
"""Checkout a version of the table. This is an in-place operation.
|
||||
|
||||
@@ -3583,6 +3625,11 @@ class LanceTable(Table):
|
||||
) -> AlterColumnsResult:
|
||||
return LOOP.run(self._table.alter_columns(*alterations))
|
||||
|
||||
def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
return LOOP.run(self._table.update_field_metadata(*updates))
|
||||
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
@@ -3637,10 +3684,18 @@ class LanceTable(Table):
|
||||
"""
|
||||
LOOP.run(self._table.migrate_v2_manifest_paths())
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.33.1",
|
||||
current_version=__version__,
|
||||
details="Use update_field_metadata() instead.",
|
||||
)
|
||||
def replace_field_metadata(self, field_name: str, new_metadata: Dict[str, str]):
|
||||
"""
|
||||
Replace the metadata of a field in the schema
|
||||
|
||||
.. deprecated:: 0.33.1
|
||||
Use :func:`update_field_metadata` instead.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
field_name: str
|
||||
@@ -5234,6 +5289,13 @@ class AsyncTable:
|
||||
"""
|
||||
return await self._inner.alter_columns(alterations)
|
||||
|
||||
async def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
"""Update per-field metadata. See
|
||||
[`Table.update_field_metadata`][lancedb.table.Table.update_field_metadata]."""
|
||||
return await self._inner.update_field_metadata(updates)
|
||||
|
||||
async def drop_columns(self, columns: Iterable[str]):
|
||||
"""
|
||||
Drop columns from the table.
|
||||
@@ -5398,6 +5460,15 @@ class AsyncTable:
|
||||
"""
|
||||
return AsyncTags(self._inner)
|
||||
|
||||
@property
|
||||
def branches(self) -> AsyncBranches:
|
||||
"""Branch management for the table.
|
||||
|
||||
Branches are isolated, writable lines of history forked from another
|
||||
branch (or version). Writes on a branch do not affect ``main``.
|
||||
"""
|
||||
return AsyncBranches(self._inner)
|
||||
|
||||
async def optimize(
|
||||
self,
|
||||
*,
|
||||
@@ -5518,12 +5589,20 @@ class AsyncTable:
|
||||
"""
|
||||
await self._inner.migrate_manifest_paths_v2()
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.33.1",
|
||||
current_version=__version__,
|
||||
details="Use update_field_metadata() instead.",
|
||||
)
|
||||
async def replace_field_metadata(
|
||||
self, field_name: str, new_metadata: dict[str, str]
|
||||
):
|
||||
"""
|
||||
Replace the metadata of a field in the schema
|
||||
|
||||
.. deprecated:: 0.33.1
|
||||
Use :func:`update_field_metadata` instead.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
field_name: str
|
||||
@@ -5725,6 +5804,50 @@ class Tags:
|
||||
LOOP.run(self._table.tags.update(tag, version))
|
||||
|
||||
|
||||
class Branches:
|
||||
"""
|
||||
Table branch manager.
|
||||
"""
|
||||
|
||||
def __init__(self, table):
|
||||
self._table = table
|
||||
|
||||
def list(self) -> Dict[str, Any]:
|
||||
"""List all branches, mapping name to branch metadata."""
|
||||
return LOOP.run(self._table.branches.list())
|
||||
|
||||
def create(
|
||||
self,
|
||||
name: str,
|
||||
from_ref: Optional[str] = None,
|
||||
from_version: Optional[int] = None,
|
||||
) -> "LanceTable":
|
||||
"""Create a branch and return a handle scoped to it.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name: str
|
||||
Name of the new branch.
|
||||
from_ref: str, optional
|
||||
Source branch to fork from. Defaults to ``main``.
|
||||
from_version: int, optional
|
||||
A specific version on ``from_ref`` to fork from. Defaults to latest.
|
||||
"""
|
||||
async_table = LOOP.run(
|
||||
self._table.branches.create(name, from_ref, from_version)
|
||||
)
|
||||
return LanceTable.from_inner(async_table._inner)
|
||||
|
||||
def checkout(self, name: str) -> "LanceTable":
|
||||
"""Check out an existing branch and return a handle scoped to it."""
|
||||
async_table = LOOP.run(self._table.branches.checkout(name))
|
||||
return LanceTable.from_inner(async_table._inner)
|
||||
|
||||
def delete(self, name: str) -> None:
|
||||
"""Delete a branch."""
|
||||
LOOP.run(self._table.branches.delete(name))
|
||||
|
||||
|
||||
class AsyncTags:
|
||||
"""
|
||||
Async table tag manager.
|
||||
@@ -5792,3 +5915,47 @@ class AsyncTags:
|
||||
The new table version to tag.
|
||||
"""
|
||||
await self._table.tags.update(tag, version)
|
||||
|
||||
|
||||
class AsyncBranches:
|
||||
"""Async table branch manager."""
|
||||
|
||||
def __init__(self, table):
|
||||
self._table = table
|
||||
|
||||
async def list(self) -> Dict[str, Any]:
|
||||
"""List all branches, mapping name to branch metadata."""
|
||||
return await self._table.branches.list()
|
||||
|
||||
async def create(
|
||||
self,
|
||||
name: str,
|
||||
from_ref: Optional[str] = None,
|
||||
from_version: Optional[int] = None,
|
||||
) -> "AsyncTable":
|
||||
"""Create a branch and return a handle scoped to it.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name: str
|
||||
Name of the new branch.
|
||||
from_ref: str, optional
|
||||
Source branch to fork from. Defaults to ``main``.
|
||||
from_version: int, optional
|
||||
A specific version on ``from_ref`` to fork from. Defaults to latest.
|
||||
"""
|
||||
# "main" and None are two spellings of the root branch in lance; normalize
|
||||
# so from_ref="main" behaves identically to the default.
|
||||
if from_ref == "main":
|
||||
from_ref = None
|
||||
inner = await self._table.branches.create(name, from_ref, from_version)
|
||||
return AsyncTable(inner)
|
||||
|
||||
async def checkout(self, name: str) -> "AsyncTable":
|
||||
"""Check out an existing branch and return a handle scoped to it."""
|
||||
inner = await self._table.branches.checkout(name)
|
||||
return AsyncTable(inner)
|
||||
|
||||
async def delete(self, name: str) -> None:
|
||||
"""Delete a branch."""
|
||||
await self._table.branches.delete(name)
|
||||
|
||||
@@ -903,6 +903,79 @@ async def test_async_tags(mem_db_async: AsyncConnection):
|
||||
)
|
||||
|
||||
|
||||
def test_branches(tmp_path):
|
||||
db = lancedb.connect(tmp_path)
|
||||
table = db.create_table(
|
||||
"test",
|
||||
data=[
|
||||
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
|
||||
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
|
||||
],
|
||||
)
|
||||
assert table.count_rows() == 2
|
||||
|
||||
# fork an isolated, writable branch from main
|
||||
branch = table.branches.create("exp")
|
||||
assert branch.count_rows() == 2
|
||||
branch.add(data=[{"vector": [10.0, 11.0], "item": "baz", "price": 30.0}])
|
||||
|
||||
# writes on the branch do not touch main
|
||||
assert branch.count_rows() == 3
|
||||
assert table.count_rows() == 2
|
||||
|
||||
# the branch is listed, with main (None) as its parent
|
||||
branches = table.branches.list()
|
||||
assert "exp" in branches
|
||||
assert branches["exp"]["parent_branch"] is None
|
||||
|
||||
# from_ref="main" is equivalent to the default
|
||||
table.branches.create("exp2", from_ref="main")
|
||||
assert table.branches.list()["exp2"]["parent_branch"] is None
|
||||
|
||||
# checkout returns a handle scoped to the branch's latest
|
||||
checked_out = table.branches.checkout("exp")
|
||||
assert checked_out.count_rows() == 3
|
||||
|
||||
# delete removes it
|
||||
table.branches.delete("exp")
|
||||
table.branches.delete("exp2")
|
||||
assert "exp" not in table.branches.list()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_branches(tmp_path):
|
||||
db = await lancedb.connect_async(tmp_path)
|
||||
table = await db.create_table(
|
||||
"test",
|
||||
data=[
|
||||
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
|
||||
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
|
||||
],
|
||||
)
|
||||
assert await table.count_rows() == 2
|
||||
|
||||
branch = await table.branches.create("exp")
|
||||
assert await branch.count_rows() == 2
|
||||
await branch.add(data=[{"vector": [10.0, 11.0], "item": "baz", "price": 30.0}])
|
||||
|
||||
assert await branch.count_rows() == 3
|
||||
assert await table.count_rows() == 2
|
||||
|
||||
branches = await table.branches.list()
|
||||
assert "exp" in branches
|
||||
assert branches["exp"]["parent_branch"] is None
|
||||
|
||||
await table.branches.create("exp2", from_ref="main")
|
||||
assert (await table.branches.list())["exp2"]["parent_branch"] is None
|
||||
|
||||
checked_out = await table.branches.checkout("exp")
|
||||
assert await checked_out.count_rows() == 3
|
||||
|
||||
await table.branches.delete("exp")
|
||||
await table.branches.delete("exp2")
|
||||
assert "exp" not in await table.branches.list()
|
||||
|
||||
|
||||
@patch("lancedb.table.AsyncTable.create_index")
|
||||
def test_create_index_method(mock_create_index, mem_db: DBConnection):
|
||||
table = mem_db.create_table(
|
||||
@@ -2472,6 +2545,30 @@ def test_alter_columns(mem_db: DBConnection):
|
||||
assert table.to_arrow().column_names == ["new_id"]
|
||||
|
||||
|
||||
def test_update_field_metadata(mem_db: DBConnection):
|
||||
data = pa.table({"id": [0, 1], "category": ["a", "b"]})
|
||||
table = mem_db.create_table("my_table", data=data)
|
||||
|
||||
res = table.update_field_metadata(
|
||||
{"path": "category", "metadata": {"unit": "label", "pii": "false"}}
|
||||
)
|
||||
assert res.version == 2
|
||||
# Arrow field metadata is bytes-keyed
|
||||
assert table.schema.field("category").metadata == {
|
||||
b"unit": b"label",
|
||||
b"pii": b"false",
|
||||
}
|
||||
|
||||
# merge: add a key, delete one via None, keep the rest
|
||||
table.update_field_metadata(
|
||||
{"path": "category", "metadata": {"source": "import", "pii": None}}
|
||||
)
|
||||
assert table.schema.field("category").metadata == {
|
||||
b"unit": b"label",
|
||||
b"source": b"import",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alter_columns_async(mem_db_async: AsyncConnection):
|
||||
data = pa.table({"id": [0, 1]})
|
||||
|
||||
@@ -16,7 +16,7 @@ use query::{FTSQuery, HybridQuery, Query, VectorQuery};
|
||||
use session::Session;
|
||||
use table::{
|
||||
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec,
|
||||
MergeResult, Table, UpdateResult,
|
||||
MergeResult, Table, UpdateFieldMetadataResult, UpdateResult,
|
||||
};
|
||||
|
||||
pub mod arrow;
|
||||
@@ -50,6 +50,7 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
|
||||
m.add_class::<RecordBatchStream>()?;
|
||||
m.add_class::<AddColumnsResult>()?;
|
||||
m.add_class::<AlterColumnsResult>()?;
|
||||
m.add_class::<UpdateFieldMetadataResult>()?;
|
||||
m.add_class::<AddResult>()?;
|
||||
m.add_class::<MergeResult>()?;
|
||||
m.add_class::<LsmWriteSpec>()?;
|
||||
|
||||
@@ -16,12 +16,12 @@ use arrow::{
|
||||
pyarrow::{FromPyArrow, PyArrowType, ToPyArrow},
|
||||
};
|
||||
use lancedb::table::{
|
||||
AddDataMode, ColumnAlteration, Duration, NewColumnTransform, OptimizeAction, OptimizeOptions,
|
||||
Table as LanceDbTable,
|
||||
AddDataMode, ColumnAlteration, Duration, FieldMetadataUpdate, NewColumnTransform,
|
||||
OptimizeAction, OptimizeOptions, Ref, Table as LanceDbTable,
|
||||
};
|
||||
use pyo3::{
|
||||
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
|
||||
exceptions::{PyKeyError, PyRuntimeError, PyValueError},
|
||||
exceptions::{PyRuntimeError, PyValueError},
|
||||
pyclass, pymethods,
|
||||
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
|
||||
};
|
||||
@@ -357,6 +357,27 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct UpdateFieldMetadataResult {
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl UpdateFieldMetadataResult {
|
||||
pub fn __repr__(&self) -> String {
|
||||
format!("UpdateFieldMetadataResult(version={})", self.version)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<lancedb::table::UpdateFieldMetadataResult> for UpdateFieldMetadataResult {
|
||||
fn from(result: lancedb::table::UpdateFieldMetadataResult) -> Self {
|
||||
Self {
|
||||
version: result.version,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DropColumnsResult {
|
||||
@@ -843,6 +864,11 @@ impl Table {
|
||||
Ok(Tags::new(self.inner_ref()?.clone()))
|
||||
}
|
||||
|
||||
#[getter]
|
||||
pub fn branches(&self) -> PyResult<Branches> {
|
||||
Ok(Branches::new(self.inner_ref()?.clone()))
|
||||
}
|
||||
|
||||
#[pyo3(signature = (offsets))]
|
||||
pub fn take_offsets(self_: PyRef<'_, Self>, offsets: Vec<u64>) -> PyResult<TakeQuery> {
|
||||
Ok(TakeQuery::new(
|
||||
@@ -1102,31 +1128,57 @@ impl Table {
|
||||
field_name: String,
|
||||
metadata: &Bound<'_, PyDict>,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let mut new_metadata = HashMap::<String, String>::new();
|
||||
for (column_name, value) in metadata.into_iter() {
|
||||
let key: String = column_name.extract()?;
|
||||
let value: String = value.extract()?;
|
||||
new_metadata.insert(key, value);
|
||||
// Deprecated: forwards to the update_field_metadata path (replace mode).
|
||||
let mut update = FieldMetadataUpdate::new(field_name).replace();
|
||||
for (key, value) in metadata.into_iter() {
|
||||
update = update.set(key.extract::<String>()?, value.extract::<String>()?);
|
||||
}
|
||||
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let native_tbl = inner
|
||||
.as_native()
|
||||
.ok_or_else(|| PyValueError::new_err("This cannot be run on a remote table"))?;
|
||||
let schema = native_tbl.manifest().await.infer_error()?.schema;
|
||||
let field = schema
|
||||
.field(&field_name)
|
||||
.ok_or_else(|| PyKeyError::new_err(format!("Field {} not found", field_name)))?;
|
||||
|
||||
native_tbl
|
||||
.replace_field_metadata(vec![(field.id as u32, new_metadata)])
|
||||
.await
|
||||
.infer_error()?;
|
||||
|
||||
inner.update_field_metadata(&[update]).await.infer_error()?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update_field_metadata<'a>(
|
||||
self_: PyRef<'a, Self>,
|
||||
updates: Vec<Bound<PyDict>>,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let updates = updates
|
||||
.iter()
|
||||
.map(|update| {
|
||||
let path: String = update
|
||||
.get_item("path")?
|
||||
.ok_or_else(|| PyValueError::new_err("Missing path"))?
|
||||
.extract()?;
|
||||
let mut field_update = FieldMetadataUpdate::new(path);
|
||||
if let Some(metadata) = update.get_item("metadata")? {
|
||||
let metadata_dict = metadata.cast::<PyDict>()?;
|
||||
for (key, value) in metadata_dict.iter() {
|
||||
let key: String = key.extract()?;
|
||||
if value.is_none() {
|
||||
field_update = field_update.remove(key);
|
||||
} else {
|
||||
field_update = field_update.set(key, value.extract::<String>()?);
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(replace) = update.get_item("replace")?
|
||||
&& replace.extract::<bool>()?
|
||||
{
|
||||
field_update = field_update.replace();
|
||||
}
|
||||
Ok(field_update)
|
||||
})
|
||||
.collect::<PyResult<Vec<_>>>()?;
|
||||
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let result = inner.update_field_metadata(&updates).await.infer_error()?;
|
||||
Ok(UpdateFieldMetadataResult::from(result))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromPyObject)]
|
||||
@@ -1218,3 +1270,66 @@ impl Tags {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass]
|
||||
pub struct Branches {
|
||||
inner: LanceDbTable,
|
||||
}
|
||||
|
||||
impl Branches {
|
||||
pub fn new(table: LanceDbTable) -> Self {
|
||||
Self { inner: table }
|
||||
}
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl Branches {
|
||||
pub fn list(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let res = inner.list_branches().await.infer_error()?;
|
||||
Python::attach(|py| {
|
||||
let py_dict = PyDict::new(py);
|
||||
for (name, contents) in res {
|
||||
let value = PyDict::new(py);
|
||||
value.set_item("parent_branch", contents.parent_branch)?;
|
||||
value.set_item("parent_version", contents.parent_version)?;
|
||||
value.set_item("manifest_size", contents.manifest_size)?;
|
||||
py_dict.set_item(name, value)?;
|
||||
}
|
||||
Ok(py_dict.unbind())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
#[pyo3(signature = (name, from_ref=None, from_version=None))]
|
||||
pub fn create(
|
||||
self_: PyRef<'_, Self>,
|
||||
name: String,
|
||||
from_ref: Option<String>,
|
||||
from_version: Option<u64>,
|
||||
) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let from = Ref::Version(from_ref, from_version);
|
||||
let table = inner.create_branch(&name, from).await.infer_error()?;
|
||||
Ok(Table::new(table))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn checkout(self_: PyRef<'_, Self>, name: String) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let table = inner.checkout_branch(&name).await.infer_error()?;
|
||||
Ok(Table::new(table))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn delete(self_: PyRef<'_, Self>, name: String) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner.delete_branch(&name).await.infer_error()?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,13 +18,13 @@ use crate::index::waiter::wait_for_index;
|
||||
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
|
||||
use crate::table::AddColumnsResult;
|
||||
use crate::table::AddResult;
|
||||
use crate::table::AlterColumnsResult;
|
||||
use crate::table::DeleteResult;
|
||||
use crate::table::DropColumnsResult;
|
||||
use crate::table::MergeResult;
|
||||
use crate::table::Tags;
|
||||
use crate::table::UpdateResult;
|
||||
use crate::table::query::create_multi_vector_plan;
|
||||
use crate::table::{AlterColumnsResult, FieldMetadataUpdate, UpdateFieldMetadataResult};
|
||||
use crate::table::{AnyQuery, Filter, Predicate, PreprocessingOutput, TableStatistics};
|
||||
use crate::utils::background_cache::BackgroundCache;
|
||||
use crate::utils::{
|
||||
@@ -1383,6 +1383,38 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
.map_err(unwrap_shared_error)
|
||||
}
|
||||
|
||||
async fn create_branch(
|
||||
&self,
|
||||
_name: &str,
|
||||
_from: lance::dataset::refs::Ref,
|
||||
) -> Result<Arc<dyn BaseTable>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "branching is not yet supported on remote tables".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn checkout_branch(&self, _name: &str) -> Result<Arc<dyn BaseTable>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "branching is not yet supported on remote tables".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn list_branches(&self) -> Result<HashMap<String, lance::dataset::refs::BranchContents>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "branching is not yet supported on remote tables".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete_branch(&self, _name: &str) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "branching is not yet supported on remote tables".into(),
|
||||
})
|
||||
}
|
||||
|
||||
fn current_branch(&self) -> Option<String> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
|
||||
let mut request = self.post_read(&format!("/v1/table/{}/count_rows/", self.identifier));
|
||||
|
||||
@@ -1968,6 +2000,35 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
self.check_mutable().await?;
|
||||
let body = serde_json::json!({ "updates": updates });
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!(
|
||||
"/v1/table/{}/update_field_metadata/",
|
||||
self.identifier
|
||||
))
|
||||
.json(&body);
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
|
||||
let result: UpdateFieldMetadataResult =
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse update_field_metadata response: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.track_write_version(result.version);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
self.check_mutable().await?;
|
||||
let body = serde_json::json!({ "columns": columns });
|
||||
@@ -2261,6 +2322,7 @@ mod tests {
|
||||
|
||||
use crate::remote::client::{ClientConfig, RetryConfig};
|
||||
use crate::table::AddDataMode;
|
||||
use crate::table::FieldMetadataUpdate;
|
||||
|
||||
use arrow::{array::AsArray, compute::concat_batches, datatypes::Int32Type};
|
||||
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, record_batch};
|
||||
@@ -6460,4 +6522,25 @@ mod tests {
|
||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
||||
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_field_metadata() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(
|
||||
request.url().path(),
|
||||
"/v1/table/my_table/update_field_metadata/"
|
||||
);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 7, "fields": {"category": {"unit": "label"}}}"#)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let result = table
|
||||
.update_field_metadata(&[FieldMetadataUpdate::new("category").set("unit", "label")])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.version, 7);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,12 +86,15 @@ pub use add_data::{AddDataBuilder, AddDataMode, AddResult, NaNVectorBehavior};
|
||||
pub use chrono::Duration;
|
||||
pub use delete::DeleteResult;
|
||||
use futures::future::join_all;
|
||||
pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
|
||||
pub use lance::dataset::refs::{BranchContents, Ref, TagContents, Tags as LanceTags};
|
||||
pub use lance::dataset::scanner::DatasetRecordBatchStream;
|
||||
use lance::dataset::statistics::DatasetStatisticsExt;
|
||||
pub use lance_index::optimize::OptimizeOptions;
|
||||
pub use optimize::{CompactionOptions, OptimizeAction, OptimizeStats};
|
||||
pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
|
||||
pub use schema_evolution::{
|
||||
AddColumnsResult, AlterColumnsResult, DropColumnsResult, FieldMetadataUpdate,
|
||||
UpdateFieldMetadataResult,
|
||||
};
|
||||
use serde_with::skip_serializing_none;
|
||||
pub use update::{UpdateBuilder, UpdateResult};
|
||||
|
||||
@@ -622,6 +625,20 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
async fn restore(&self) -> Result<()>;
|
||||
/// List the versions of the table.
|
||||
async fn list_versions(&self) -> Result<Vec<Version>>;
|
||||
/// Create a new branch from `from` and return a handle scoped to it.
|
||||
async fn create_branch(
|
||||
&self,
|
||||
name: &str,
|
||||
from: lance::dataset::refs::Ref,
|
||||
) -> Result<Arc<dyn BaseTable>>;
|
||||
/// Check out an existing branch and return a handle scoped to it.
|
||||
async fn checkout_branch(&self, name: &str) -> Result<Arc<dyn BaseTable>>;
|
||||
/// List the branches of the table.
|
||||
async fn list_branches(&self) -> Result<HashMap<String, BranchContents>>;
|
||||
/// Delete a branch.
|
||||
async fn delete_branch(&self, name: &str) -> Result<()>;
|
||||
/// The branch this handle is scoped to, or `None` for `main`.
|
||||
fn current_branch(&self) -> Option<String>;
|
||||
/// Get the table definition.
|
||||
async fn table_definition(&self) -> Result<TableDefinition>;
|
||||
/// Get the table URI (storage location)
|
||||
@@ -660,6 +677,19 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
message: "create_insert_exec not implemented".to_string(),
|
||||
})
|
||||
}
|
||||
/// Update per-field metadata. Merges into existing metadata by default;
|
||||
/// [`FieldMetadataUpdate::remove`] deletes a key and
|
||||
/// [`FieldMetadataUpdate::replace`] swaps the field's whole map.
|
||||
///
|
||||
/// The default returns `NotSupported`; Lance-backed and remote tables override it.
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
_updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
Err(Error::NotSupported {
|
||||
message: "update_field_metadata is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A Table is a collection of strong typed Rows.
|
||||
@@ -1340,6 +1370,14 @@ impl Table {
|
||||
self.inner.alter_columns(alterations).await
|
||||
}
|
||||
|
||||
/// Update per-field metadata (merges by default).
|
||||
pub async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
self.inner.update_field_metadata(updates).await
|
||||
}
|
||||
|
||||
/// Remove columns from the table.
|
||||
pub async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
self.inner.drop_columns(columns).await
|
||||
@@ -1601,6 +1639,46 @@ impl Table {
|
||||
self.inner.tags().await
|
||||
}
|
||||
|
||||
/// Create a new branch from `from` (a version, tag, or branch) and return
|
||||
/// a writable, isolated handle scoped to it. `self` is unaffected.
|
||||
pub async fn create_branch(
|
||||
&self,
|
||||
name: &str,
|
||||
from: impl Into<lance::dataset::refs::Ref>,
|
||||
) -> Result<Self> {
|
||||
let inner = self.inner.create_branch(name, from.into()).await?;
|
||||
Ok(Self {
|
||||
inner,
|
||||
database: self.database.clone(),
|
||||
embedding_registry: self.embedding_registry.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Check out an existing branch and return a handle scoped to it.
|
||||
pub async fn checkout_branch(&self, name: &str) -> Result<Self> {
|
||||
let inner = self.inner.checkout_branch(name).await?;
|
||||
Ok(Self {
|
||||
inner,
|
||||
database: self.database.clone(),
|
||||
embedding_registry: self.embedding_registry.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// List the branches of the table.
|
||||
pub async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
|
||||
self.inner.list_branches().await
|
||||
}
|
||||
|
||||
/// Delete a branch.
|
||||
pub async fn delete_branch(&self, name: &str) -> Result<()> {
|
||||
self.inner.delete_branch(name).await
|
||||
}
|
||||
|
||||
/// The branch this handle is scoped to, or `None` for `main`.
|
||||
pub fn current_branch(&self) -> Option<String> {
|
||||
self.inner.current_branch()
|
||||
}
|
||||
|
||||
/// Retrieve statistics on the table
|
||||
pub async fn stats(&self) -> Result<TableStatistics> {
|
||||
self.inner.stats().await
|
||||
@@ -1837,6 +1915,21 @@ impl NativeTable {
|
||||
self
|
||||
}
|
||||
|
||||
/// Build a sibling `NativeTable` with the same identity but a different
|
||||
/// (independent) dataset wrapper — used to hand out branch-scoped handles.
|
||||
fn with_dataset(&self, dataset: dataset::DatasetConsistencyWrapper) -> Self {
|
||||
Self {
|
||||
name: self.name.clone(),
|
||||
namespace: self.namespace.clone(),
|
||||
id: self.id.clone(),
|
||||
uri: self.uri.clone(),
|
||||
dataset,
|
||||
read_consistency_interval: self.read_consistency_interval,
|
||||
namespace_client: self.namespace_client.clone(),
|
||||
pushdown_operations: self.pushdown_operations.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Opens an existing Table using a namespace client.
|
||||
///
|
||||
/// This method uses `DatasetBuilder::from_namespace` to open the table, which
|
||||
@@ -2580,6 +2673,7 @@ impl NativeTable {
|
||||
/// field id and the second element is a hashmap of metadata key-value
|
||||
/// pairs.
|
||||
///
|
||||
#[deprecated(since = "0.33.1", note = "Use `update_field_metadata` instead")]
|
||||
pub async fn replace_field_metadata(
|
||||
&self,
|
||||
new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
|
||||
@@ -2627,6 +2721,43 @@ impl BaseTable for NativeTable {
|
||||
self.dataset.reload().await
|
||||
}
|
||||
|
||||
async fn create_branch(
|
||||
&self,
|
||||
name: &str,
|
||||
from: lance::dataset::refs::Ref,
|
||||
) -> Result<Arc<dyn BaseTable>> {
|
||||
let mut ds = (*self.dataset.get().await?).clone();
|
||||
let branch_ds = ds.create_branch(name, from, None).await?;
|
||||
let dataset = dataset::DatasetConsistencyWrapper::new_latest(
|
||||
branch_ds,
|
||||
self.read_consistency_interval,
|
||||
);
|
||||
Ok(Arc::new(self.with_dataset(dataset)))
|
||||
}
|
||||
|
||||
async fn checkout_branch(&self, name: &str) -> Result<Arc<dyn BaseTable>> {
|
||||
let branch_ds = self.dataset.get().await?.checkout_branch(name).await?;
|
||||
let dataset = dataset::DatasetConsistencyWrapper::new_latest(
|
||||
branch_ds,
|
||||
self.read_consistency_interval,
|
||||
);
|
||||
Ok(Arc::new(self.with_dataset(dataset)))
|
||||
}
|
||||
|
||||
async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
|
||||
Ok(self.dataset.get().await?.list_branches().await?)
|
||||
}
|
||||
|
||||
async fn delete_branch(&self, name: &str) -> Result<()> {
|
||||
let mut ds = (*self.dataset.get().await?).clone();
|
||||
ds.delete_branch(name).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn current_branch(&self) -> Option<String> {
|
||||
self.dataset.current_branch()
|
||||
}
|
||||
|
||||
async fn list_versions(&self) -> Result<Vec<Version>> {
|
||||
Ok(self.dataset.get().await?.versions().await?)
|
||||
}
|
||||
@@ -2886,6 +3017,13 @@ impl BaseTable for NativeTable {
|
||||
schema_evolution::execute_alter_columns(self, alterations).await
|
||||
}
|
||||
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
schema_evolution::execute_update_field_metadata(self, updates).await
|
||||
}
|
||||
|
||||
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
schema_evolution::execute_drop_columns(self, columns).await
|
||||
}
|
||||
@@ -3136,7 +3274,6 @@ pub struct FragmentSummaryStats {
|
||||
#[cfg(test)]
|
||||
#[allow(deprecated)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
@@ -3347,6 +3484,56 @@ mod tests {
|
||||
assert_eq!(table.version().await.unwrap(), 4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branches() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// main: one row at v1
|
||||
let table = conn
|
||||
.create_table("my_table", some_sample_data())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 1);
|
||||
assert_eq!(table.current_branch(), None);
|
||||
let main_version = table.version().await.unwrap();
|
||||
|
||||
// branch off main's current version; it starts with main's data
|
||||
let branch = table.create_branch("exp", main_version).await.unwrap();
|
||||
assert_eq!(branch.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(branch.count_rows(None).await.unwrap(), 1);
|
||||
|
||||
// writes on the branch are isolated from main
|
||||
branch.add(some_sample_data()).execute().await.unwrap();
|
||||
assert_eq!(branch.count_rows(None).await.unwrap(), 2);
|
||||
assert_eq!(
|
||||
table.count_rows(None).await.unwrap(),
|
||||
1,
|
||||
"main must be untouched by branch writes"
|
||||
);
|
||||
|
||||
// the branch shows up in the listing
|
||||
let branches = table.list_branches().await.unwrap();
|
||||
assert!(branches.contains_key("exp"));
|
||||
|
||||
// checking out the branch from the main handle sees the branch's latest data
|
||||
let checked_out = table.checkout_branch("exp").await.unwrap();
|
||||
assert_eq!(checked_out.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(checked_out.count_rows(None).await.unwrap(), 2);
|
||||
|
||||
// delete removes it from the listing
|
||||
table.delete_branch("exp").await.unwrap();
|
||||
let branches = table.list_branches().await.unwrap();
|
||||
assert!(!branches.contains_key("exp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_index() {
|
||||
use arrow_array::RecordBatch;
|
||||
@@ -4449,10 +4636,10 @@ mod tests {
|
||||
Some(&"test_val2_update".to_string())
|
||||
);
|
||||
|
||||
let mut new_field_metadata = HashMap::<String, String>::new();
|
||||
new_field_metadata.insert("test_field_key1".into(), "test_field_val1".into());
|
||||
native_tbl
|
||||
.replace_field_metadata(vec![(field.id as u32, new_field_metadata)])
|
||||
.update_field_metadata(&[
|
||||
FieldMetadataUpdate::new("i").set("test_field_key1", "test_field_val1")
|
||||
])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -144,8 +144,19 @@ impl DatasetConsistencyWrapper {
|
||||
}
|
||||
|
||||
/// Checkout a branch and track its HEAD for new versions.
|
||||
pub async fn as_branch(&self, _branch: impl Into<String>) -> Result<()> {
|
||||
todo!("Branch support not yet implemented")
|
||||
pub async fn as_branch(&self, branch: impl Into<String>) -> Result<()> {
|
||||
let branch = branch.into();
|
||||
let dataset = { self.state.lock()?.dataset.clone() };
|
||||
let new_dataset = dataset.checkout_branch(&branch).await?;
|
||||
|
||||
let mut state = self.state.lock()?;
|
||||
state.dataset = Arc::new(new_dataset);
|
||||
state.pinned_version = None;
|
||||
drop(state);
|
||||
if let ConsistencyMode::Eventual(bg_cache) = &self.consistency {
|
||||
bg_cache.invalidate();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check that the dataset is in a mutable mode (Latest).
|
||||
@@ -161,6 +172,17 @@ impl DatasetConsistencyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
/// The branch this wrapper is currently tracking, or `None` for `main`.
|
||||
pub fn current_branch(&self) -> Option<String> {
|
||||
self.state
|
||||
.lock()
|
||||
.unwrap_or_else(|e| e.into_inner())
|
||||
.dataset
|
||||
.manifest()
|
||||
.branch
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Returns the version, if in time travel mode, or None otherwise.
|
||||
pub fn time_travel_version(&self) -> Option<u64> {
|
||||
self.state
|
||||
@@ -737,4 +759,31 @@ mod tests {
|
||||
let result = wrapper.reload().await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_as_branch_is_writable_and_tracked() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
|
||||
// v1 on main, then shallow-clone a branch off it
|
||||
let mut ds = create_test_dataset(uri).await;
|
||||
let v1 = ds.version().version;
|
||||
ds.create_branch("exp", v1, None).await.unwrap();
|
||||
|
||||
// wrapper starts on main: latest, writable, no branch
|
||||
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||
assert_eq!(wrapper.current_branch(), None);
|
||||
|
||||
// switch to the branch
|
||||
wrapper.as_branch("exp").await.unwrap();
|
||||
assert_eq!(wrapper.current_branch().as_deref(), Some("exp"));
|
||||
|
||||
// a branch is writable (unlike a pinned/time-travel checkout)
|
||||
wrapper.ensure_mutable().unwrap();
|
||||
assert_eq!(wrapper.time_travel_version(), None);
|
||||
|
||||
// get() returns the branch dataset
|
||||
let on_branch = wrapper.get().await.unwrap();
|
||||
assert_eq!(on_branch.manifest().branch.as_deref(), Some("exp"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
use lance::dataset::{ColumnAlteration, NewColumnTransform};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use super::NativeTable;
|
||||
use crate::Result;
|
||||
@@ -44,6 +45,52 @@ pub struct DropColumnsResult {
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
/// A single field's metadata update, addressed by dot-path.
|
||||
///
|
||||
/// Merges into the field's existing metadata by default. Use [`Self::remove`] to
|
||||
/// delete a key, or [`Self::replace`] to swap the field's entire metadata map.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
|
||||
pub struct FieldMetadataUpdate {
|
||||
/// Dot-separated path to the field (e.g. `"embedding"` or `"address.zip"`).
|
||||
pub path: String,
|
||||
/// Keys to set (`Some`) or delete (`None`).
|
||||
pub metadata: HashMap<String, Option<String>>,
|
||||
/// If `true`, replace the field's entire metadata map instead of merging.
|
||||
pub replace: bool,
|
||||
}
|
||||
|
||||
impl FieldMetadataUpdate {
|
||||
pub fn new(path: impl Into<String>) -> Self {
|
||||
Self {
|
||||
path: path.into(),
|
||||
metadata: HashMap::new(),
|
||||
replace: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
self.metadata.insert(key.into(), Some(value.into()));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn remove(mut self, key: impl Into<String>) -> Self {
|
||||
self.metadata.insert(key.into(), None);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn replace(mut self) -> Self {
|
||||
self.replace = true;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct UpdateFieldMetadataResult {
|
||||
/// The commit version associated with the operation.
|
||||
#[serde(default)]
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
/// Internal implementation of the add columns logic.
|
||||
///
|
||||
/// Adds new columns to the table using the provided transforms.
|
||||
@@ -90,6 +137,32 @@ pub(crate) async fn execute_drop_columns(
|
||||
Ok(DropColumnsResult { version })
|
||||
}
|
||||
|
||||
/// Internal implementation of the update field metadata logic.
|
||||
///
|
||||
/// Merges or replaces per-field metadata, addressing fields by dot-path.
|
||||
pub(crate) async fn execute_update_field_metadata(
|
||||
table: &NativeTable,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
|
||||
let mut builder = dataset.update_field_metadata();
|
||||
for update in updates {
|
||||
let entries = update.metadata.iter().map(|(k, v)| (k.clone(), v.clone()));
|
||||
builder = if update.replace {
|
||||
builder.replace(&update.path, entries)?
|
||||
} else {
|
||||
builder.update(&update.path, entries)?
|
||||
};
|
||||
}
|
||||
builder.await?;
|
||||
|
||||
let version = dataset.version().version;
|
||||
table.dataset.update(dataset);
|
||||
Ok(UpdateFieldMetadataResult { version })
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow_array::{Int32Array, StringArray, record_batch};
|
||||
@@ -97,6 +170,7 @@ mod tests {
|
||||
use futures::TryStreamExt;
|
||||
use lance::dataset::ColumnAlteration;
|
||||
|
||||
use super::FieldMetadataUpdate;
|
||||
use crate::connect;
|
||||
use crate::query::{ExecutableQuery, QueryBase, Select};
|
||||
use crate::table::NewColumnTransform;
|
||||
@@ -610,4 +684,46 @@ mod tests {
|
||||
let v4 = table.version().await.unwrap();
|
||||
assert_eq!(drop_result.version, v4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_field_metadata() {
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
let batch = record_batch!(
|
||||
("id", Int32, [1, 2, 3]),
|
||||
("category", Utf8, ["A", "B", "C"])
|
||||
)
|
||||
.unwrap();
|
||||
let table = conn
|
||||
.create_table("test_update_field_metadata", batch)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Set metadata on a field.
|
||||
table
|
||||
.update_field_metadata(&[FieldMetadataUpdate::new("category")
|
||||
.set("unit", "label")
|
||||
.set("pii", "false")])
|
||||
.await
|
||||
.unwrap();
|
||||
let schema = table.schema().await.unwrap();
|
||||
let field = schema.field_with_name("category").unwrap();
|
||||
assert_eq!(
|
||||
field.metadata().get("unit").map(String::as_str),
|
||||
Some("label")
|
||||
);
|
||||
|
||||
// Merge: add a key, delete one, keep the rest.
|
||||
table
|
||||
.update_field_metadata(&[FieldMetadataUpdate::new("category")
|
||||
.set("source", "import")
|
||||
.remove("pii")])
|
||||
.await
|
||||
.unwrap();
|
||||
let schema = table.schema().await.unwrap();
|
||||
let md = schema.field_with_name("category").unwrap().metadata();
|
||||
assert_eq!(md.get("unit").map(String::as_str), Some("label")); // preserved
|
||||
assert_eq!(md.get("source").map(String::as_str), Some("import")); // added
|
||||
assert!(!md.contains_key("pii")); // deleted
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user