From cb9a00a28d40926f7ab43b5d6233d8aee7ea6a3c Mon Sep 17 00:00:00 2001 From: Bert Date: Thu, 21 Nov 2024 13:35:14 -0500 Subject: [PATCH] feat: add list_versions to typescript, rust and remote python sdks (#1850) Will require update to lance dependency to bring in this change which makes the version serializable https://github.com/lancedb/lance/pull/3143 --- Cargo.toml | 16 +++--- nodejs/lancedb/table.ts | 19 +++++++ nodejs/src/table.rs | 31 +++++++++++ python/pyproject.toml | 2 +- python/python/lancedb/remote/table.py | 4 ++ python/python/lancedb/table.py | 19 ++++++- python/src/table.rs | 29 +++++++++- rust/lancedb/src/remote/table.rs | 76 ++++++++++++++++++++++++++- rust/lancedb/src/table.rs | 12 ++++- 9 files changed, 195 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index abf6be41..cbd99291 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,15 +21,15 @@ categories = ["database-implementations"] rust-version = "1.80.0" # TODO: lower this once we upgrade Lance again. [workspace.dependencies] -lance = { "version" = "=0.19.3", "features" = [ +lance = { "version" = "=0.20.0", "features" = [ "dynamodb", -], git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" } -lance-index = { version = "=0.19.3", git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" } -lance-linalg = { version = "=0.19.3", git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" } -lance-table = { version = "=0.19.3", git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" } -lance-testing = { version = "=0.19.3", git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" } -lance-datafusion = { version = "=0.19.3", git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" } -lance-encoding = { version = "=0.19.3", git = "https://github.com/lancedb/lance.git", tag = "v0.19.3-beta.1" } +], git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" } +lance-index = { version = "=0.20.0", git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" } +lance-linalg = { version = "=0.20.0", git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" } +lance-table = { version = "=0.20.0", git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" } +lance-testing = { version = "=0.20.0", git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" } +lance-datafusion = { version = "=0.20.0", git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" } +lance-encoding = { version = "=0.20.0", git = "https://github.com/lancedb/lance.git", tag = "v0.20.0-beta.1" } # Note that this one does not include pyarrow arrow = { version = "52.2", optional = false } arrow-array = "52.2" diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 4228fa0d..f3158c7d 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -87,6 +87,12 @@ export interface OptimizeOptions { deleteUnverified: boolean; } +export interface Version { + version: number; + timestamp: Date; + metadata: Record; +} + /** * A Table is a collection of Records in a LanceDB Database. * @@ -360,6 +366,11 @@ export abstract class Table { */ abstract checkoutLatest(): Promise; + /** + * List all the versions of the table + */ + abstract listVersions(): Promise; + /** * Restore the table to the currently checked out version * @@ -659,6 +670,14 @@ export class LocalTable extends Table { await this.inner.checkoutLatest(); } + async listVersions(): Promise { + return (await this.inner.listVersions()).map((version) => ({ + version: version.version, + timestamp: new Date(version.timestamp / 1000), + metadata: version.metadata, + })); + } + async restore(): Promise { await this.inner.restore(); } diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index b2da97de..a52f9fbc 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use arrow_ipc::writer::FileWriter; use lancedb::ipc::ipc_file_to_batches; use lancedb::table::{ @@ -226,6 +228,28 @@ impl Table { self.inner_ref()?.checkout_latest().await.default_error() } + #[napi(catch_unwind)] + pub async fn list_versions(&self) -> napi::Result> { + self.inner_ref()? + .list_versions() + .await + .map(|versions| { + versions + .iter() + .map(|version| Version { + version: version.version as i64, + timestamp: version.timestamp.timestamp_micros(), + metadata: version + .metadata + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(), + }) + .collect() + }) + .default_error() + } + #[napi(catch_unwind)] pub async fn restore(&self) -> napi::Result<()> { self.inner_ref()?.restore().await.default_error() @@ -466,3 +490,10 @@ impl From for IndexStatistics { } } } + +#[napi(object)] +pub struct Version { + pub version: i64, + pub timestamp: i64, + pub metadata: HashMap, +} diff --git a/python/pyproject.toml b/python/pyproject.toml index 205a3f9a..c31d89a4 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ name = "lancedb" dependencies = [ "deprecation", "nest-asyncio~=1.0", - "pylance==0.19.3b1", + "pylance==0.20.0b1", "tqdm>=4.27.0", "pydantic>=1.10", "packaging", diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index c897cb6b..c1eea6e1 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -78,6 +78,10 @@ class RemoteTable(Table): self.schema.metadata ) + def list_versions(self): + """List all versions of the table""" + return self._loop.run_until_complete(self._table.list_versions()) + def to_arrow(self) -> pa.Table: """to_arrow() is not yet supported on LanceDB cloud.""" raise NotImplementedError("to_arrow() is not yet supported on LanceDB cloud.") diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 4c77beb2..15c67dc0 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -8,7 +8,7 @@ import inspect import time from abc import ABC, abstractmethod from dataclasses import dataclass -from datetime import timedelta +from datetime import datetime, timedelta from functools import cached_property from typing import ( TYPE_CHECKING, @@ -1041,6 +1041,10 @@ class Table(ABC): It can also be used to undo a `[Self::checkout]` operation """ + @abstractmethod + def list_versions(self): + """List all versions of the table""" + @cached_property def _dataset_uri(self) -> str: return _table_uri(self._conn.uri, self.name) @@ -2931,6 +2935,19 @@ class AsyncTable: """ return await self._inner.version() + async def list_versions(self): + """ + List all versions of the table + """ + versions = await self._inner.list_versions() + for v in versions: + ts_nanos = v["timestamp"] + v["timestamp"] = datetime.fromtimestamp(ts_nanos // 1e9) + timedelta( + microseconds=(ts_nanos % 1e9) // 1e3 + ) + + return versions + async def checkout(self, version): """ Checks out a specific version of the Table diff --git a/python/src/table.rs b/python/src/table.rs index b5087f8d..bd470a63 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -8,7 +8,7 @@ use lancedb::table::{ use pyo3::{ exceptions::{PyRuntimeError, PyValueError}, pyclass, pymethods, - types::{PyDict, PyDictMethods, PyString}, + types::{IntoPyDict, PyDict, PyDictMethods, PyString}, Bound, FromPyObject, PyAny, PyRef, PyResult, Python, ToPyObject, }; use pyo3_asyncio_0_21::tokio::future_into_py; @@ -246,6 +246,33 @@ impl Table { ) } + pub fn list_versions(self_: PyRef<'_, Self>) -> PyResult> { + let inner = self_.inner_ref()?.clone(); + future_into_py(self_.py(), async move { + let versions = inner.list_versions().await.infer_error()?; + let versions_as_dict = Python::with_gil(|py| { + versions + .iter() + .map(|v| { + let dict = PyDict::new_bound(py); + dict.set_item("version", v.version).unwrap(); + dict.set_item( + "timestamp", + v.timestamp.timestamp_nanos_opt().unwrap_or_default(), + ) + .unwrap(); + + let tup: Vec<(&String, &String)> = v.metadata.iter().collect(); + dict.set_item("metadata", tup.into_py_dict(py)).unwrap(); + dict.to_object(py) + }) + .collect::>() + }); + + Ok(versions_as_dict) + }) + } + pub fn checkout(self_: PyRef<'_, Self>, version: u64) -> PyResult> { let inner = self_.inner_ref()?.clone(); future_into_py(self_.py(), async move { diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index 4388a78d..11dbee1b 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -19,7 +19,7 @@ use http::header::CONTENT_TYPE; use http::StatusCode; use lance::arrow::json::JsonSchema; use lance::dataset::scanner::DatasetRecordBatchStream; -use lance::dataset::{ColumnAlteration, NewColumnTransform}; +use lance::dataset::{ColumnAlteration, NewColumnTransform, Version}; use lance_datafusion::exec::OneShotExec; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; @@ -363,6 +363,34 @@ impl TableInternal for RemoteTable { message: "restore is not supported on LanceDB cloud.".into(), }) } + + async fn list_versions(&self) -> Result> { + let request = self + .client + .post(&format!("/v1/table/{}/version/list/", self.name)); + let (request_id, response) = self.client.send(request, true).await?; + let response = self.check_table_response(&request_id, response).await?; + + #[derive(Deserialize)] + struct ListVersionsResponse { + versions: Vec, + } + + let body = response.text().await.err_to_http(request_id.clone())?; + let body: ListVersionsResponse = + serde_json::from_str(&body).map_err(|err| Error::Http { + source: format!( + "Failed to parse list_versions response: {}, body: {}", + err, body + ) + .into(), + request_id, + status_code: None, + })?; + + Ok(body.versions) + } + async fn schema(&self) -> Result { let schema = self.describe().await?.schema; Ok(Arc::new(schema.try_into()?)) @@ -775,6 +803,7 @@ mod tests { use arrow::{array::AsArray, compute::concat_batches, datatypes::Int32Type}; use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator}; use arrow_schema::{DataType, Field, Schema}; + use chrono::{DateTime, Utc}; use futures::{future::BoxFuture, StreamExt, TryFutureExt}; use lance_index::scalar::FullTextSearchQuery; use reqwest::Body; @@ -1489,6 +1518,51 @@ mod tests { assert_eq!(indices, expected); } + #[tokio::test] + async fn test_list_versions() { + let table = Table::new_with_handler("my_table", |request| { + assert_eq!(request.method(), "POST"); + assert_eq!(request.url().path(), "/v1/table/my_table/version/list/"); + + let version1 = lance::dataset::Version { + version: 1, + timestamp: "2024-01-01T00:00:00Z".parse().unwrap(), + metadata: Default::default(), + }; + let version2 = lance::dataset::Version { + version: 2, + timestamp: "2024-02-01T00:00:00Z".parse().unwrap(), + metadata: Default::default(), + }; + let response_body = serde_json::json!({ + "versions": [ + version1, + version2, + ] + }); + let response_body = serde_json::to_string(&response_body).unwrap(); + + http::Response::builder() + .status(200) + .body(response_body) + .unwrap() + }); + + let versions = table.list_versions().await.unwrap(); + assert_eq!(versions.len(), 2); + assert_eq!(versions[0].version, 1); + assert_eq!( + versions[0].timestamp, + "2024-01-01T00:00:00Z".parse::>().unwrap() + ); + assert_eq!(versions[1].version, 2); + assert_eq!( + versions[1].timestamp, + "2024-02-01T00:00:00Z".parse::>().unwrap() + ); + // assert_eq!(versions, expected); + } + #[tokio::test] async fn test_index_stats() { let table = Table::new_with_handler("my_table", |request| { diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 8b4f9cee..48a08913 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -37,7 +37,7 @@ pub use lance::dataset::ColumnAlteration; pub use lance::dataset::NewColumnTransform; pub use lance::dataset::ReadParams; use lance::dataset::{ - Dataset, UpdateBuilder as LanceUpdateBuilder, WhenMatched, WriteMode, WriteParams, + Dataset, UpdateBuilder as LanceUpdateBuilder, Version, WhenMatched, WriteMode, WriteParams, }; use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource}; use lance::io::WrappingObjectStore; @@ -426,6 +426,7 @@ pub(crate) trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Syn async fn checkout(&self, version: u64) -> Result<()>; async fn checkout_latest(&self) -> Result<()>; async fn restore(&self) -> Result<()>; + async fn list_versions(&self) -> Result>; async fn table_definition(&self) -> Result; fn dataset_uri(&self) -> &str; } @@ -955,6 +956,11 @@ impl Table { self.inner.restore().await } + /// List all the versions of the table + pub async fn list_versions(&self) -> Result> { + self.inner.list_versions().await + } + /// List all indices that have been created with [`Self::create_index`] pub async fn list_indices(&self) -> Result> { self.inner.list_indices().await @@ -1707,6 +1713,10 @@ impl TableInternal for NativeTable { self.dataset.reload().await } + async fn list_versions(&self) -> Result> { + Ok(self.dataset.get().await?.versions().await?) + } + async fn restore(&self) -> Result<()> { let version = self.dataset