feat: add list/create/delete/update/checkout tag API (#2353)

add the tag related API to list existing tags, attach tag to a version,
update the tag version, delete tag, get the version of the tag, and
checkout the version that the tag bounded to.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced table version tagging, allowing users to create, update,
delete, and list human-readable tags for specific table versions.
  - Enabled checking out a table by either version number or tag name.
- Added new interfaces for tag management in both Python and Node.js
APIs, supporting synchronous and asynchronous workflows.

- **Bug Fixes**
  - None.

- **Documentation**
- Updated documentation to describe the new tagging features, including
usage examples.

- **Tests**
- Added comprehensive tests for tag creation, updating, deletion,
listing, and version checkout by tag in both Python and Node.js
environments.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
LuQQiu
2025-04-28 10:04:46 -07:00
committed by GitHub
parent 178bcf9c90
commit a9311c4dc0
17 changed files with 1192 additions and 99 deletions

View File

@@ -1,5 +1,5 @@
from datetime import timedelta
from typing import Dict, List, Optional, Tuple, Any, Union, Literal
from typing import Dict, List, Optional, Tuple, Any, TypedDict, Union, Literal
import pyarrow as pa
@@ -47,7 +47,7 @@ class Table:
): ...
async def list_versions(self) -> List[Dict[str, Any]]: ...
async def version(self) -> int: ...
async def checkout(self, version: int): ...
async def checkout(self, version: Union[int, str]): ...
async def checkout_latest(self): ...
async def restore(self, version: Optional[int] = None): ...
async def list_indices(self) -> list[IndexConfig]: ...
@@ -61,9 +61,18 @@ class Table:
cleanup_since_ms: Optional[int] = None,
delete_unverified: Optional[bool] = None,
) -> OptimizeStats: ...
@property
def tags(self) -> Tags: ...
def query(self) -> Query: ...
def vector_search(self) -> VectorQuery: ...
class Tags:
async def list(self) -> Dict[str, Tag]: ...
async def get_version(self, tag: str) -> int: ...
async def create(self, tag: str, version: int): ...
async def delete(self, tag: str): ...
async def update(self, tag: str, version: int): ...
class IndexConfig:
index_type: str
columns: List[str]
@@ -195,3 +204,7 @@ class RemovalStats:
class OptimizeStats:
compaction: CompactionStats
prune: RemovalStats
class Tag(TypedDict):
version: int
manifest_size: int

View File

@@ -18,7 +18,7 @@ from lancedb.merge import LanceMergeInsertBuilder
from lancedb.embeddings import EmbeddingFunctionRegistry
from ..query import LanceVectorQueryBuilder, LanceQueryBuilder
from ..table import AsyncTable, IndexStatistics, Query, Table
from ..table import AsyncTable, IndexStatistics, Query, Table, Tags
class RemoteTable(Table):
@@ -54,6 +54,10 @@ class RemoteTable(Table):
"""Get the current version of the table"""
return LOOP.run(self._table.version())
@property
def tags(self) -> Tags:
return Tags(self._table)
@cached_property
def embedding_functions(self) -> Dict[str, EmbeddingFunctionConfig]:
"""
@@ -81,7 +85,7 @@ class RemoteTable(Table):
"""to_pandas() is not yet supported on LanceDB cloud."""
return NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.")
def checkout(self, version: int):
def checkout(self, version: Union[int, str]):
return LOOP.run(self._table.checkout(version))
def checkout_latest(self):

View File

@@ -77,6 +77,7 @@ if TYPE_CHECKING:
OptimizeStats,
CleanupStats,
CompactionStats,
Tag,
)
from .db import LanceDBConnection
from .index import IndexConfig
@@ -582,6 +583,35 @@ class Table(ABC):
"""
raise NotImplementedError
@property
@abstractmethod
def tags(self) -> Tags:
"""Tag management for the table.
Similar to Git, tags are a way to add metadata to a specific version of the
table.
.. warning::
Tagged versions are exempted from the :py:meth:`cleanup_old_versions()`
process.
To remove a version that has been tagged, you must first
:py:meth:`~Tags.delete` the associated tag.
Examples
--------
.. code-block:: python
table = db.open_table("my_table")
table.tags.create("v2-prod-20250203", 10)
tags = table.tags.list()
"""
raise NotImplementedError
@property
@abstractmethod
def embedding_functions(self) -> Dict[str, EmbeddingFunctionConfig]:
@@ -1354,7 +1384,7 @@ class Table(ABC):
"""
@abstractmethod
def checkout(self, version: int):
def checkout(self, version: Union[int, str]):
"""
Checks out a specific version of the Table
@@ -1369,6 +1399,12 @@ class Table(ABC):
Any operation that modifies the table will fail while the table is in a checked
out state.
Parameters
----------
version: int | str,
The version to check out. A version number (`int`) or a tag
(`str`) can be provided.
To return the table to a normal state use `[Self::checkout_latest]`
"""
@@ -1538,7 +1574,45 @@ class LanceTable(Table):
"""Get the current version of the table"""
return LOOP.run(self._table.version())
def checkout(self, version: int):
@property
def tags(self) -> Tags:
"""Tag management for the table.
Similar to Git, tags are a way to add metadata to a specific version of the
table.
.. warning::
Tagged versions are exempted from the :py:meth:`cleanup_old_versions()`
process.
To remove a version that has been tagged, you must first
:py:meth:`~Tags.delete` the associated tag.
Returns
-------
Tags
The tag manager for managing tags for the table.
Examples
--------
>>> import lancedb
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table",
... [{"vector": [1.1, 0.9], "type": "vector"}])
>>> table.tags.create("v1", table.version)
>>> table.add([{"vector": [0.5, 0.2], "type": "vector"}])
>>> tags = table.tags.list()
>>> print(tags["v1"]["version"])
1
>>> table.checkout("v1")
>>> table.to_pandas()
vector type
0 [1.1, 0.9] vector
"""
return Tags(self._table)
def checkout(self, version: Union[int, str]):
"""Checkout a version of the table. This is an in-place operation.
This allows viewing previous versions of the table. If you wish to
@@ -1550,8 +1624,9 @@ class LanceTable(Table):
Parameters
----------
version : int
The version to checkout.
version: int | str,
The version to check out. A version number (`int`) or a tag
(`str`) can be provided.
Examples
--------
@@ -3746,7 +3821,7 @@ class AsyncTable:
return versions
async def checkout(self, version: int):
async def checkout(self, version: int | str):
"""
Checks out a specific version of the Table
@@ -3761,6 +3836,12 @@ class AsyncTable:
Any operation that modifies the table will fail while the table is in a checked
out state.
Parameters
----------
version: int | str,
The version to check out. A version number (`int`) or a tag
(`str`) can be provided.
To return the table to a normal state use `[Self::checkout_latest]`
"""
try:
@@ -3798,6 +3879,24 @@ class AsyncTable:
"""
await self._inner.restore(version)
@property
def tags(self) -> AsyncTags:
"""Tag management for the dataset.
Similar to Git, tags are a way to add metadata to a specific version of the
dataset.
.. warning::
Tagged versions are exempted from the
:py:meth:`optimize(cleanup_older_than)` process.
To remove a version that has been tagged, you must first
:py:meth:`~Tags.delete` the associated tag.
"""
return AsyncTags(self._inner)
async def optimize(
self,
*,
@@ -3967,3 +4066,141 @@ class IndexStatistics:
# a dictionary instead of a class.
def __getitem__(self, key):
return getattr(self, key)
class Tags:
"""
Table tag manager.
"""
def __init__(self, table):
self._table = table
def list(self) -> Dict[str, Tag]:
"""
List all table tags.
Returns
-------
dict[str, Tag]
A dictionary mapping tag names to version numbers.
"""
return LOOP.run(self._table.tags.list())
def get_version(self, tag: str) -> int:
"""
Get the version of a tag.
Parameters
----------
tag: str,
The name of the tag to get the version for.
"""
return LOOP.run(self._table.tags.get_version(tag))
def create(self, tag: str, version: int) -> None:
"""
Create a tag for a given table version.
Parameters
----------
tag: str,
The name of the tag to create. This name must be unique among all tag
names for the table.
version: int,
The table version to tag.
"""
LOOP.run(self._table.tags.create(tag, version))
def delete(self, tag: str) -> None:
"""
Delete tag from the table.
Parameters
----------
tag: str,
The name of the tag to delete.
"""
LOOP.run(self._table.tags.delete(tag))
def update(self, tag: str, version: int) -> None:
"""
Update tag to a new version.
Parameters
----------
tag: str,
The name of the tag to update.
version: int,
The new table version to tag.
"""
LOOP.run(self._table.tags.update(tag, version))
class AsyncTags:
"""
Async table tag manager.
"""
def __init__(self, table):
self._table = table
async def list(self) -> Dict[str, Tag]:
"""
List all table tags.
Returns
-------
dict[str, Tag]
A dictionary mapping tag names to version numbers.
"""
return await self._table.tags.list()
async def get_version(self, tag: str) -> int:
"""
Get the version of a tag.
Parameters
----------
tag: str,
The name of the tag to get the version for.
"""
return await self._table.tags.get_version(tag)
async def create(self, tag: str, version: int) -> None:
"""
Create a tag for a given table version.
Parameters
----------
tag: str,
The name of the tag to create. This name must be unique among all tag
names for the table.
version: int,
The table version to tag.
"""
await self._table.tags.create(tag, version)
async def delete(self, tag: str) -> None:
"""
Delete tag from the table.
Parameters
----------
tag: str,
The name of the tag to delete.
"""
await self._table.tags.delete(tag)
async def update(self, tag: str, version: int) -> None:
"""
Update tag to a new version.
Parameters
----------
tag: str,
The name of the tag to update.
version: int,
The new table version to tag.
"""
await self._table.tags.update(tag, version)

View File

@@ -529,6 +529,113 @@ def test_versioning(mem_db: DBConnection):
assert len(table) == 2
def test_tags(mem_db: DBConnection):
table = mem_db.create_table(
"test",
data=[
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
],
)
table.tags.create("tag1", 1)
tags = table.tags.list()
assert "tag1" in tags
assert tags["tag1"]["version"] == 1
table.add(
data=[
{"vector": [10.0, 11.0], "item": "baz", "price": 30.0},
],
)
table.tags.create("tag2", 2)
tags = table.tags.list()
assert "tag1" in tags
assert "tag2" in tags
assert tags["tag1"]["version"] == 1
assert tags["tag2"]["version"] == 2
table.tags.delete("tag2")
table.tags.update("tag1", 2)
tags = table.tags.list()
assert "tag1" in tags
assert tags["tag1"]["version"] == 2
table.tags.update("tag1", 1)
tags = table.tags.list()
assert "tag1" in tags
assert tags["tag1"]["version"] == 1
table.checkout("tag1")
assert table.version == 1
assert table.count_rows() == 2
table.tags.create("tag2", 2)
table.checkout("tag2")
assert table.version == 2
assert table.count_rows() == 3
table.checkout_latest()
table.add(
data=[
{"vector": [12.0, 13.0], "item": "baz", "price": 40.0},
],
)
@pytest.mark.asyncio
async def test_async_tags(mem_db_async: AsyncConnection):
table = await mem_db_async.create_table(
"test",
data=[
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
],
)
await table.tags.create("tag1", 1)
tags = await table.tags.list()
assert "tag1" in tags
assert tags["tag1"]["version"] == 1
await table.add(
data=[
{"vector": [10.0, 11.0], "item": "baz", "price": 30.0},
],
)
await table.tags.create("tag2", 2)
tags = await table.tags.list()
assert "tag1" in tags
assert "tag2" in tags
assert tags["tag1"]["version"] == 1
assert tags["tag2"]["version"] == 2
await table.tags.delete("tag2")
await table.tags.update("tag1", 2)
tags = await table.tags.list()
assert "tag1" in tags
assert tags["tag1"]["version"] == 2
await table.tags.update("tag1", 1)
tags = await table.tags.list()
assert "tag1" in tags
assert tags["tag1"]["version"] == 1
await table.checkout("tag1")
assert await table.version() == 1
assert await table.count_rows() == 2
await table.tags.create("tag2", 2)
await table.checkout("tag2")
assert await table.version() == 2
assert await table.count_rows() == 3
await table.checkout_latest()
await table.add(
data=[
{"vector": [12.0, 13.0], "item": "baz", "price": 40.0},
],
)
@patch("lancedb.table.AsyncTable.create_index")
def test_create_index_method(mock_create_index, mem_db: DBConnection):
table = mem_db.create_table(

View File

@@ -2,6 +2,11 @@
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{collections::HashMap, sync::Arc};
use crate::{
error::PythonErrorExt,
index::{extract_index_params, IndexConfig},
query::Query,
};
use arrow::{
datatypes::{DataType, Schema},
ffi_stream::ArrowArrayStreamReader,
@@ -12,19 +17,13 @@ use lancedb::table::{
Table as LanceDbTable,
};
use pyo3::{
exceptions::{PyKeyError, PyRuntimeError, PyValueError},
exceptions::{PyIOError, PyKeyError, PyRuntimeError, PyValueError},
pyclass, pymethods,
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
Bound, FromPyObject, PyAny, PyRef, PyResult, Python,
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods, PyInt, PyString},
Bound, FromPyObject, PyAny, PyObject, PyRef, PyResult, Python,
};
use pyo3_async_runtimes::tokio::future_into_py;
use crate::{
error::PythonErrorExt,
index::{extract_index_params, IndexConfig},
query::Query,
};
/// Statistics about a compaction operation.
#[pyclass(get_all)]
#[derive(Clone, Debug)]
@@ -322,10 +321,26 @@ impl Table {
})
}
pub fn checkout(self_: PyRef<'_, Self>, version: u64) -> PyResult<Bound<'_, PyAny>> {
pub fn checkout(self_: PyRef<'_, Self>, version: PyObject) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.checkout(version).await.infer_error()
let py = self_.py();
let (is_int, int_value, string_value) = if let Ok(i) = version.downcast_bound::<PyInt>(py) {
let num: u64 = i.extract()?;
(true, num, String::new())
} else if let Ok(s) = version.downcast_bound::<PyString>(py) {
let str_value = s.to_string();
(false, 0, str_value)
} else {
return Err(PyIOError::new_err(
"version must be an integer or a string.",
));
};
future_into_py(py, async move {
if is_int {
inner.checkout(int_value).await.infer_error()
} else {
inner.checkout_tag(&string_value).await.infer_error()
}
})
}
@@ -352,6 +367,11 @@ impl Table {
Query::new(self.inner_ref().unwrap().query())
}
#[getter]
pub fn tags(&self) -> PyResult<Tags> {
Ok(Tags::new(self.inner_ref()?.clone()))
}
/// Optimize the on-disk data by compacting and pruning old data, for better performance.
#[pyo3(signature = (cleanup_since_ms=None, delete_unverified=None, retrain=None))]
pub fn optimize(
@@ -586,3 +606,72 @@ pub struct MergeInsertParams {
when_not_matched_by_source_delete: bool,
when_not_matched_by_source_condition: Option<String>,
}
#[pyclass]
pub struct Tags {
inner: LanceDbTable,
}
impl Tags {
pub fn new(table: LanceDbTable) -> Self {
Self { inner: table }
}
}
#[pymethods]
impl Tags {
pub fn list(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let tags = inner.tags().await.infer_error()?;
let res = tags.list().await.infer_error()?;
Python::with_gil(|py| {
let py_dict = PyDict::new(py);
for (key, contents) in res {
let value_dict = PyDict::new(py);
value_dict.set_item("version", contents.version)?;
value_dict.set_item("manifest_size", contents.manifest_size)?;
py_dict.set_item(key, value_dict)?;
}
Ok(py_dict.unbind())
})
})
}
pub fn get_version(self_: PyRef<'_, Self>, tag: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let tags = inner.tags().await.infer_error()?;
let res = tags.get_version(tag.as_str()).await.infer_error()?;
Ok(res)
})
}
pub fn create(self_: PyRef<Self>, tag: String, version: u64) -> PyResult<Bound<PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let mut tags = inner.tags().await.infer_error()?;
tags.create(tag.as_str(), version).await.infer_error()?;
Ok(())
})
}
pub fn delete(self_: PyRef<Self>, tag: String) -> PyResult<Bound<PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let mut tags = inner.tags().await.infer_error()?;
tags.delete(tag.as_str()).await.infer_error()?;
Ok(())
})
}
pub fn update(self_: PyRef<Self>, tag: String, version: u64) -> PyResult<Bound<PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let mut tags = inner.tags().await.infer_error()?;
tags.update(tag.as_str(), version).await.infer_error()?;
Ok(())
})
}
}