feat: add list_versions to typescript, rust and remote python sdks (#1850)

Will require update to lance dependency to bring in this change which
makes the version serializable
https://github.com/lancedb/lance/pull/3143
This commit is contained in:
Bert
2024-11-21 13:35:14 -05:00
committed by GitHub
parent 72af977a73
commit cb9a00a28d
9 changed files with 195 additions and 13 deletions

View File

@@ -21,15 +21,15 @@ categories = ["database-implementations"]
rust-version = "1.80.0" # TODO: lower this once we upgrade Lance again.
[workspace.dependencies]
lance = { "version" = "=0.19.3", "features" = [
lance = { "version" = "=0.20.0", "features" = [
"dynamodb",
], git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" }
lance-index = { version = "=0.19.3", git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" }
lance-linalg = { version = "=0.19.3", git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" }
lance-table = { version = "=0.19.3", git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" }
lance-testing = { version = "=0.19.3", git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" }
lance-datafusion = { version = "=0.19.3", git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" }
lance-encoding = { version = "=0.19.3", git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" }
], git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" }
lance-index = { version = "=0.20.0", git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" }
lance-linalg = { version = "=0.20.0", git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" }
lance-table = { version = "=0.20.0", git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" }
lance-testing = { version = "=0.20.0", git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" }
lance-datafusion = { version = "=0.20.0", git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" }
lance-encoding = { version = "=0.20.0", git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" }
# Note that this one does not include pyarrow
arrow = { version = "52.2", optional = false }
arrow-array = "52.2"

View File

@@ -87,6 +87,12 @@ export interface OptimizeOptions {
deleteUnverified: boolean;
}
export interface Version {
version: number;
timestamp: Date;
metadata: Record<string, string>;
}
/**
* A Table is a collection of Records in a LanceDB Database.
*
@@ -360,6 +366,11 @@ export abstract class Table {
*/
abstract checkoutLatest(): Promise<void>;
/**
* List all the versions of the table
*/
abstract listVersions(): Promise<Version[]>;
/**
* Restore the table to the currently checked out version
*
@@ -659,6 +670,14 @@ export class LocalTable extends Table {
await this.inner.checkoutLatest();
}
async listVersions(): Promise<Version[]> {
return (await this.inner.listVersions()).map((version) => ({
version: version.version,
timestamp: new Date(version.timestamp / 1000),
metadata: version.metadata,
}));
}
async restore(): Promise<void> {
await this.inner.restore();
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use arrow_ipc::writer::FileWriter;
use lancedb::ipc::ipc_file_to_batches;
use lancedb::table::{
@@ -226,6 +228,28 @@ impl Table {
self.inner_ref()?.checkout_latest().await.default_error()
}
#[napi(catch_unwind)]
pub async fn list_versions(&self) -> napi::Result<Vec<Version>> {
self.inner_ref()?
.list_versions()
.await
.map(|versions| {
versions
.iter()
.map(|version| Version {
version: version.version as i64,
timestamp: version.timestamp.timestamp_micros(),
metadata: version
.metadata
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
})
.collect()
})
.default_error()
}
#[napi(catch_unwind)]
pub async fn restore(&self) -> napi::Result<()> {
self.inner_ref()?.restore().await.default_error()
@@ -466,3 +490,10 @@ impl From<lancedb::index::IndexStatistics> for IndexStatistics {
}
}
}
#[napi(object)]
pub struct Version {
pub version: i64,
pub timestamp: i64,
pub metadata: HashMap<String, String>,
}

View File

@@ -4,7 +4,7 @@ name = "lancedb"
dependencies = [
"deprecation",
"nest-asyncio~=1.0",
"pylance==0.19.3b1",
"pylance==0.20.0b1",
"tqdm>=4.27.0",
"pydantic>=1.10",
"packaging",

View File

@@ -78,6 +78,10 @@ class RemoteTable(Table):
self.schema.metadata
)
def list_versions(self):
"""List all versions of the table"""
return self._loop.run_until_complete(self._table.list_versions())
def to_arrow(self) -> pa.Table:
"""to_arrow() is not yet supported on LanceDB cloud."""
raise NotImplementedError("to_arrow() is not yet supported on LanceDB cloud.")

View File

@@ -8,7 +8,7 @@ import inspect
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import timedelta
from datetime import datetime, timedelta
from functools import cached_property
from typing import (
TYPE_CHECKING,
@@ -1041,6 +1041,10 @@ class Table(ABC):
It can also be used to undo a `[Self::checkout]` operation
"""
@abstractmethod
def list_versions(self):
"""List all versions of the table"""
@cached_property
def _dataset_uri(self) -> str:
return _table_uri(self._conn.uri, self.name)
@@ -2931,6 +2935,19 @@ class AsyncTable:
"""
return await self._inner.version()
async def list_versions(self):
"""
List all versions of the table
"""
versions = await self._inner.list_versions()
for v in versions:
ts_nanos = v["timestamp"]
v["timestamp"] = datetime.fromtimestamp(ts_nanos // 1e9) + timedelta(
microseconds=(ts_nanos % 1e9) // 1e3
)
return versions
async def checkout(self, version):
"""
Checks out a specific version of the Table

View File

@@ -8,7 +8,7 @@ use lancedb::table::{
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
pyclass, pymethods,
types::{PyDict, PyDictMethods, PyString},
types::{IntoPyDict, PyDict, PyDictMethods, PyString},
Bound, FromPyObject, PyAny, PyRef, PyResult, Python, ToPyObject,
};
use pyo3_asyncio_0_21::tokio::future_into_py;
@@ -246,6 +246,33 @@ impl Table {
)
}
pub fn list_versions(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let versions = inner.list_versions().await.infer_error()?;
let versions_as_dict = Python::with_gil(|py| {
versions
.iter()
.map(|v| {
let dict = PyDict::new_bound(py);
dict.set_item("version", v.version).unwrap();
dict.set_item(
"timestamp",
v.timestamp.timestamp_nanos_opt().unwrap_or_default(),
)
.unwrap();
let tup: Vec<(&String, &String)> = v.metadata.iter().collect();
dict.set_item("metadata", tup.into_py_dict(py)).unwrap();
dict.to_object(py)
})
.collect::<Vec<_>>()
});
Ok(versions_as_dict)
})
}
pub fn checkout(self_: PyRef<'_, Self>, version: u64) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {

View File

@@ -19,7 +19,7 @@ use http::header::CONTENT_TYPE;
use http::StatusCode;
use lance::arrow::json::JsonSchema;
use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::{ColumnAlteration, NewColumnTransform};
use lance::dataset::{ColumnAlteration, NewColumnTransform, Version};
use lance_datafusion::exec::OneShotExec;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
@@ -363,6 +363,34 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
message: "restore is not supported on LanceDB cloud.".into(),
})
}
async fn list_versions(&self) -> Result<Vec<Version>> {
let request = self
.client
.post(&format!("/v1/table/{}/version/list/", self.name));
let (request_id, response) = self.client.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
#[derive(Deserialize)]
struct ListVersionsResponse {
versions: Vec<Version>,
}
let body = response.text().await.err_to_http(request_id.clone())?;
let body: ListVersionsResponse =
serde_json::from_str(&body).map_err(|err| Error::Http {
source: format!(
"Failed to parse list_versions response: {}, body: {}",
err, body
)
.into(),
request_id,
status_code: None,
})?;
Ok(body.versions)
}
async fn schema(&self) -> Result<SchemaRef> {
let schema = self.describe().await?.schema;
Ok(Arc::new(schema.try_into()?))
@@ -775,6 +803,7 @@ mod tests {
use arrow::{array::AsArray, compute::concat_batches, datatypes::Int32Type};
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema};
use chrono::{DateTime, Utc};
use futures::{future::BoxFuture, StreamExt, TryFutureExt};
use lance_index::scalar::FullTextSearchQuery;
use reqwest::Body;
@@ -1489,6 +1518,51 @@ mod tests {
assert_eq!(indices, expected);
}
#[tokio::test]
async fn test_list_versions() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/version/list/");
let version1 = lance::dataset::Version {
version: 1,
timestamp: "2024-01-01T00:00:00Z".parse().unwrap(),
metadata: Default::default(),
};
let version2 = lance::dataset::Version {
version: 2,
timestamp: "2024-02-01T00:00:00Z".parse().unwrap(),
metadata: Default::default(),
};
let response_body = serde_json::json!({
"versions": [
version1,
version2,
]
});
let response_body = serde_json::to_string(&response_body).unwrap();
http::Response::builder()
.status(200)
.body(response_body)
.unwrap()
});
let versions = table.list_versions().await.unwrap();
assert_eq!(versions.len(), 2);
assert_eq!(versions[0].version, 1);
assert_eq!(
versions[0].timestamp,
"2024-01-01T00:00:00Z".parse::<DateTime<Utc>>().unwrap()
);
assert_eq!(versions[1].version, 2);
assert_eq!(
versions[1].timestamp,
"2024-02-01T00:00:00Z".parse::<DateTime<Utc>>().unwrap()
);
// assert_eq!(versions, expected);
}
#[tokio::test]
async fn test_index_stats() {
let table = Table::new_with_handler("my_table", |request| {

View File

@@ -37,7 +37,7 @@ pub use lance::dataset::ColumnAlteration;
pub use lance::dataset::NewColumnTransform;
pub use lance::dataset::ReadParams;
use lance::dataset::{
Dataset, UpdateBuilder as LanceUpdateBuilder, WhenMatched, WriteMode, WriteParams,
Dataset, UpdateBuilder as LanceUpdateBuilder, Version, WhenMatched, WriteMode, WriteParams,
};
use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource};
use lance::io::WrappingObjectStore;
@@ -426,6 +426,7 @@ pub(crate) trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Syn
async fn checkout(&self, version: u64) -> Result<()>;
async fn checkout_latest(&self) -> Result<()>;
async fn restore(&self) -> Result<()>;
async fn list_versions(&self) -> Result<Vec<Version>>;
async fn table_definition(&self) -> Result<TableDefinition>;
fn dataset_uri(&self) -> &str;
}
@@ -955,6 +956,11 @@ impl Table {
self.inner.restore().await
}
/// List all the versions of the table
pub async fn list_versions(&self) -> Result<Vec<Version>> {
self.inner.list_versions().await
}
/// List all indices that have been created with [`Self::create_index`]
pub async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
self.inner.list_indices().await
@@ -1707,6 +1713,10 @@ impl TableInternal for NativeTable {
self.dataset.reload().await
}
async fn list_versions(&self) -> Result<Vec<Version>> {
Ok(self.dataset.get().await?.versions().await?)
}
async fn restore(&self) -> Result<()> {
let version =
self.dataset