From a9311c4dc07dc7bf8a0c1f4a458896e0d47b78f0 Mon Sep 17 00:00:00 2001 From: LuQQiu Date: Mon, 28 Apr 2025 10:04:46 -0700 Subject: [PATCH] feat: add list/create/delete/update/checkout tag API (#2353) add the tag related API to list existing tags, attach tag to a version, update the tag version, delete tag, get the version of the tag, and checkout the version that the tag bounded to. ## Summary by CodeRabbit - **New Features** - Introduced table version tagging, allowing users to create, update, delete, and list human-readable tags for specific table versions. - Enabled checking out a table by either version number or tag name. - Added new interfaces for tag management in both Python and Node.js APIs, supporting synchronous and asynchronous workflows. - **Bug Fixes** - None. - **Documentation** - Updated documentation to describe the new tagging features, including usage examples. - **Tests** - Added comprehensive tests for tag creation, updating, deletion, listing, and version checkout by tag in both Python and Node.js environments. --- Cargo.lock | 123 ++++++------- docs/src/js/classes/Table.md | 32 +++- docs/src/js/classes/TagContents.md | 35 ++++ docs/src/js/classes/Tags.md | 99 +++++++++++ docs/src/js/globals.md | 2 + nodejs/__test__/table.test.ts | 67 +++++++ nodejs/lancedb/index.ts | 2 + nodejs/lancedb/table.ts | 34 +++- nodejs/src/table.rs | 90 ++++++++++ python/python/lancedb/_lancedb.pyi | 17 +- python/python/lancedb/remote/table.py | 8 +- python/python/lancedb/table.py | 247 +++++++++++++++++++++++++- python/python/tests/test_table.py | 107 +++++++++++ python/src/table.rs | 113 ++++++++++-- rust/lancedb/src/remote/table.rs | 144 +++++++++++++++ rust/lancedb/src/table.rs | 142 +++++++++++++++ rust/lancedb/src/table/dataset.rs | 29 ++- 17 files changed, 1192 insertions(+), 99 deletions(-) create mode 100644 docs/src/js/classes/TagContents.md create mode 100644 docs/src/js/classes/Tags.md diff --git a/Cargo.lock b/Cargo.lock index cc8265b9..70cc3579 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -25,7 +25,7 @@ checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", "const-random", - "getrandom 0.2.15", + "getrandom 0.2.16", "once_cell", "version_check", "zerocopy 0.7.35", @@ -512,9 +512,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-config" -version = "1.6.1" +version = "1.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c39646d1a6b51240a1a23bb57ea4eebede7e16fbc237fdc876980233dcecb4f" +checksum = "b6fcc63c9860579e4cb396239570e979376e70aab79e496621748a09913f8b36" dependencies = [ "aws-credential-types", "aws-runtime", @@ -542,9 +542,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.2" +version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4471bef4c22a06d2c7a1b6492493d3fdf24a805323109d6874f9c94d5906ac14" +checksum = "687bc16bc431a8533fe0097c7f0182874767f920989d7260950172ae8e3c4465" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -577,9 +577,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.5.6" +version = "1.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0aff45ffe35196e593ea3b9dd65b320e51e2dda95aff4390bc459e461d09c6ad" +checksum = "6c4063282c69991e57faab9e5cb21ae557e59f5b0fb285c196335243df8dc25c" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -594,7 +594,6 @@ dependencies = [ "fastrand", "http 0.2.12", "http-body 0.4.6", - "once_cell", "percent-encoding", "pin-project-lite", "tracing", @@ -603,9 +602,9 @@ dependencies = [ [[package]] name = "aws-sdk-bedrockruntime" -version = "1.82.0" +version = "1.83.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cb95f77abd4321348dd2f52a25e1de199732f54d2a35860ad20f5df21c66b44" +checksum = "b82a56e1a0c4b145031c3a99e68127eec0a4206ad34a5653ddf04afc18053376" dependencies = [ "aws-credential-types", "aws-runtime", @@ -629,9 +628,9 @@ dependencies = [ [[package]] name = "aws-sdk-dynamodb" -version = "1.71.2" +version = "1.72.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d49d08b1c99ca9a7de728a8975504857f2c24581a177f952e2a10244c305a1c" +checksum = "412cd587b03bacb2f7b94a5446cc77dee49a8fa848e636f9545df3aadbbfaf8b" dependencies = [ "aws-credential-types", "aws-runtime", @@ -652,9 +651,9 @@ dependencies = [ [[package]] name = "aws-sdk-kms" -version = "1.65.0" +version = "1.66.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5325c5e2badf4148e850017cc56cc205888c6e0b52c9e29d3501ec577005230" +checksum = "655097cd83ab1f15575890943135192560f77097413c6dd1733fdbdc453e81ac" dependencies = [ "aws-credential-types", "aws-runtime", @@ -675,9 +674,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.82.0" +version = "1.83.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6eab2900764411ab01c8e91a76fd11a63b4e12bc3da97d9e14a0ce1343d86d3" +checksum = "51384750334005f40e1a334b0d54eca822a77eacdcf3c50fdf38f583c5eee7a2" dependencies = [ "aws-credential-types", "aws-runtime", @@ -710,9 +709,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.64.0" +version = "1.65.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02d4bdb0e5f80f0689e61c77ab678b2b9304af329616af38aef5b6b967b8e736" +checksum = "8efec445fb78df585327094fcef4cad895b154b58711e504db7a93c41aa27151" dependencies = [ "aws-credential-types", "aws-runtime", @@ -733,9 +732,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.65.0" +version = "1.66.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acbbb3ce8da257aedbccdcb1aadafbbb6a5fe9adf445db0e1ea897bdc7e22d08" +checksum = "5e49cca619c10e7b002dc8e66928ceed66ab7f56c1a3be86c5437bf2d8d89bba" dependencies = [ "aws-credential-types", "aws-runtime", @@ -756,9 +755,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.65.0" +version = "1.66.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96a78a8f50a1630db757b60f679c8226a8a70ee2ab5f5e6e51dc67f6c61c7cfd" +checksum = "7420479eac0a53f776cc8f0d493841ffe58ad9d9783f3947be7265784471b47a" dependencies = [ "aws-credential-types", "aws-runtime", @@ -780,9 +779,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d03c3c05ff80d54ff860fe38c726f6f494c639ae975203a101335f223386db" +checksum = "3503af839bd8751d0bdc5a46b9cac93a003a353e635b0c12cf2376b5b53e41ea" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -796,7 +795,6 @@ dependencies = [ "hmac", "http 0.2.12", "http 1.3.1", - "once_cell", "p256", "percent-encoding", "ring", @@ -853,9 +851,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.62.0" +version = "0.62.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5949124d11e538ca21142d1fba61ab0a2a2c1bc3ed323cdb3e4b878bfb83166" +checksum = "99335bec6cdc50a346fda1437f9fefe33abf8c99060739a546a16457f2862ca9" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -866,7 +864,6 @@ dependencies = [ "http 0.2.12", "http 1.3.1", "http-body 0.4.6", - "once_cell", "percent-encoding", "pin-project-lite", "pin-utils", @@ -912,12 +909,11 @@ dependencies = [ [[package]] name = "aws-smithy-observability" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "445d065e76bc1ef54963db400319f1dd3ebb3e0a74af20f7f7630625b0cc7cc0" +checksum = "9364d5989ac4dd918e5cc4c4bdcc61c9be17dcd2586ea7f69e348fc7c6cab393" dependencies = [ "aws-smithy-runtime-api", - "once_cell", ] [[package]] @@ -932,9 +928,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.8.1" +version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0152749e17ce4d1b47c7747bdfec09dac1ccafdcbc741ebf9daa2a373356730f" +checksum = "14302f06d1d5b7d333fd819943075b13d27c7700b414f574c3c35859bfb55d5e" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -948,7 +944,6 @@ dependencies = [ "http 1.3.1", "http-body 0.4.6", "http-body 1.0.1", - "once_cell", "pin-project-lite", "pin-utils", "tokio", @@ -957,9 +952,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.7.4" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3da37cf5d57011cb1753456518ec76e31691f1f474b73934a284eb2a1c76510f" +checksum = "a1e5d9e3a80a18afa109391fb5ad09c3daf887b516c6fd805a157c6ea7994a57" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -974,9 +969,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836155caafba616c0ff9b07944324785de2ab016141c3550bd1c07882f8cee8f" +checksum = "40076bd09fadbc12d5e026ae080d0930defa606856186e31d83ccc6a255eeaf3" dependencies = [ "base64-simd", "bytes", @@ -1009,9 +1004,9 @@ dependencies = [ [[package]] name = "aws-types" -version = "1.3.6" +version = "1.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3873f8deed8927ce8d04487630dc9ff73193bab64742a61d050e57a68dec4125" +checksum = "8a322fec39e4df22777ed3ad8ea868ac2f94cd15e1a55f6ee8d8d6305057689a" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -1028,7 +1023,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" dependencies = [ "futures-core", - "getrandom 0.2.15", + "getrandom 0.2.16", "instant", "pin-project-lite", "rand 0.8.5", @@ -1331,9 +1326,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.19" +version = "1.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e3a13707ac958681c13b39b458c073d0d9bc8a22cb1b2f4c8e55eb72c13f362" +checksum = "04da6a0d40b948dfc4fa8f5bbf402b0fc1a64a28dbf7d12ffd683550f2c1b63a" dependencies = [ "jobserver", "libc", @@ -1521,7 +1516,7 @@ version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" dependencies = [ - "getrandom 0.2.15", + "getrandom 0.2.16", "once_cell", "tiny-keccak", ] @@ -3002,9 +2997,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", "js-sys", @@ -3808,7 +3803,7 @@ dependencies = [ "arrow-schema", "arrow-select", "bytes", - "getrandom 0.2.15", + "getrandom 0.2.16", "half", "num-traits", "rand 0.8.5", @@ -5282,7 +5277,7 @@ version = "0.39.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ea21b858b16b9c0e17a12db2800d11aa5b4bd182be6b3022eb537bbfc1f2db5" dependencies = [ - "getrandom 0.2.15", + "getrandom 0.2.16", "polars-arrow", "polars-core", "polars-error", @@ -5313,7 +5308,7 @@ dependencies = [ "ethnum", "fast-float", "foreign_vec", - "getrandom 0.2.15", + "getrandom 0.2.16", "hashbrown 0.14.5", "itoa", "itoap", @@ -5652,7 +5647,7 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" dependencies = [ - "zerocopy 0.8.24", + "zerocopy 0.8.25", ] [[package]] @@ -5976,7 +5971,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.15", + "getrandom 0.2.16", ] [[package]] @@ -6117,7 +6112,7 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" dependencies = [ - "getrandom 0.2.15", + "getrandom 0.2.16", "libredox", "thiserror 1.0.69", ] @@ -6128,7 +6123,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b" dependencies = [ - "getrandom 0.2.15", + "getrandom 0.2.16", "libredox", "thiserror 2.0.12", ] @@ -6275,7 +6270,7 @@ checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" dependencies = [ "cc", "cfg-if", - "getrandom 0.2.15", + "getrandom 0.2.16", "libc", "untrusted", "windows-sys 0.52.0", @@ -7411,7 +7406,7 @@ dependencies = [ "aho-corasick", "derive_builder", "esaxx-rs", - "getrandom 0.2.15", + "getrandom 0.2.16", "indicatif", "itertools 0.12.1", "lazy_static", @@ -7495,9 +7490,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.14" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", @@ -8415,9 +8410,9 @@ checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" [[package]] name = "winnow" -version = "0.7.6" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63d3fcd9bba44b03821e7d699eeee959f3126dcc4aa8e4ae18ec617c2a5cea10" +checksum = "6cb8234a863ea0e8cd7284fcdd4f145233eb00fee02bbdd9861aec44e6477bc5" dependencies = [ "memchr", ] @@ -8499,11 +8494,11 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.24" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879" +checksum = "a1702d9583232ddb9174e01bb7c15a2ab8fb1bc6f227aa1233858c351a3ba0cb" dependencies = [ - "zerocopy-derive 0.8.24", + "zerocopy-derive 0.8.25", ] [[package]] @@ -8519,9 +8514,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.8.24" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be" +checksum = "28a6e20d751156648aa063f3800b706ee209a32c0b4d9f24be3d980b01be55ef" dependencies = [ "proc-macro2", "quote", diff --git a/docs/src/js/classes/Table.md b/docs/src/js/classes/Table.md index 1dc54b8e..d80544f7 100644 --- a/docs/src/js/classes/Table.md +++ b/docs/src/js/classes/Table.md @@ -117,8 +117,8 @@ wish to return to standard mode, call `checkoutLatest`. #### Parameters -* **version**: `number` - The version to checkout +* **version**: `string` \| `number` + The version to checkout, could be version number or tag #### Returns @@ -615,6 +615,34 @@ of the given query *** +### tags() + +```ts +abstract tags(): Promise +``` + +Get a tags manager for this table. + +Tags allow you to label specific versions of a table with a human-readable name. +The returned tags manager can be used to list, create, update, or delete tags. + +#### Returns + +`Promise`<[`Tags`](Tags.md)> + +A tags manager for this table + +#### Example + +```typescript +const tagsManager = await table.tags(); +await tagsManager.create("v1", 1); +const tags = await tagsManager.list(); +console.log(tags); // { "v1": { version: 1, manifestSize: ... } } +``` + +*** + ### toArrow() ```ts diff --git a/docs/src/js/classes/TagContents.md b/docs/src/js/classes/TagContents.md new file mode 100644 index 00000000..68ea5481 --- /dev/null +++ b/docs/src/js/classes/TagContents.md @@ -0,0 +1,35 @@ +[**@lancedb/lancedb**](../README.md) • **Docs** + +*** + +[@lancedb/lancedb](../globals.md) / TagContents + +# Class: TagContents + +## Constructors + +### new TagContents() + +```ts +new TagContents(): TagContents +``` + +#### Returns + +[`TagContents`](TagContents.md) + +## Properties + +### manifestSize + +```ts +manifestSize: number; +``` + +*** + +### version + +```ts +version: number; +``` diff --git a/docs/src/js/classes/Tags.md b/docs/src/js/classes/Tags.md new file mode 100644 index 00000000..341673b3 --- /dev/null +++ b/docs/src/js/classes/Tags.md @@ -0,0 +1,99 @@ +[**@lancedb/lancedb**](../README.md) • **Docs** + +*** + +[@lancedb/lancedb](../globals.md) / Tags + +# Class: Tags + +## Constructors + +### new Tags() + +```ts +new Tags(): Tags +``` + +#### Returns + +[`Tags`](Tags.md) + +## Methods + +### create() + +```ts +create(tag, version): Promise +``` + +#### Parameters + +* **tag**: `string` + +* **version**: `number` + +#### Returns + +`Promise`<`void`> + +*** + +### delete() + +```ts +delete(tag): Promise +``` + +#### Parameters + +* **tag**: `string` + +#### Returns + +`Promise`<`void`> + +*** + +### getVersion() + +```ts +getVersion(tag): Promise +``` + +#### Parameters + +* **tag**: `string` + +#### Returns + +`Promise`<`number`> + +*** + +### list() + +```ts +list(): Promise> +``` + +#### Returns + +`Promise`<`Record`<`string`, [`TagContents`](TagContents.md)>> + +*** + +### update() + +```ts +update(tag, version): Promise +``` + +#### Parameters + +* **tag**: `string` + +* **version**: `number` + +#### Returns + +`Promise`<`void`> diff --git a/docs/src/js/globals.md b/docs/src/js/globals.md index 2b1d546c..2b00af61 100644 --- a/docs/src/js/globals.md +++ b/docs/src/js/globals.md @@ -27,6 +27,8 @@ - [QueryBase](classes/QueryBase.md) - [RecordBatchIterator](classes/RecordBatchIterator.md) - [Table](classes/Table.md) +- [TagContents](classes/TagContents.md) +- [Tags](classes/Tags.md) - [VectorColumnOptions](classes/VectorColumnOptions.md) - [VectorQuery](classes/VectorQuery.md) diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index 0980df76..f9336004 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -1178,6 +1178,73 @@ describe("when dealing with versioning", () => { }); }); +describe("when dealing with tags", () => { + let tmpDir: tmp.DirResult; + beforeEach(() => { + tmpDir = tmp.dirSync({ unsafeCleanup: true }); + }); + afterEach(() => { + tmpDir.removeCallback(); + }); + + it("can manage tags", async () => { + const conn = await connect(tmpDir.name, { + readConsistencyInterval: 0, + }); + + const table = await conn.createTable("my_table", [ + { id: 1n, vector: [0.1, 0.2] }, + ]); + expect(await table.version()).toBe(1); + + await table.add([{ id: 2n, vector: [0.3, 0.4] }]); + expect(await table.version()).toBe(2); + + const tagsManager = await table.tags(); + + const initialTags = await tagsManager.list(); + expect(Object.keys(initialTags).length).toBe(0); + + const tag1 = "tag1"; + await tagsManager.create(tag1, 1); + expect(await tagsManager.getVersion(tag1)).toBe(1); + + const tagsAfterFirst = await tagsManager.list(); + expect(Object.keys(tagsAfterFirst).length).toBe(1); + expect(tagsAfterFirst).toHaveProperty(tag1); + expect(tagsAfterFirst[tag1].version).toBe(1); + + await tagsManager.create("tag2", 2); + expect(await tagsManager.getVersion("tag2")).toBe(2); + + const tagsAfterSecond = await tagsManager.list(); + expect(Object.keys(tagsAfterSecond).length).toBe(2); + expect(tagsAfterSecond).toHaveProperty(tag1); + expect(tagsAfterSecond[tag1].version).toBe(1); + expect(tagsAfterSecond).toHaveProperty("tag2"); + expect(tagsAfterSecond["tag2"].version).toBe(2); + + await table.add([{ id: 3n, vector: [0.5, 0.6] }]); + await tagsManager.update(tag1, 3); + expect(await tagsManager.getVersion(tag1)).toBe(3); + + await tagsManager.delete("tag2"); + const tagsAfterDelete = await tagsManager.list(); + expect(Object.keys(tagsAfterDelete).length).toBe(1); + expect(tagsAfterDelete).toHaveProperty(tag1); + expect(tagsAfterDelete[tag1].version).toBe(3); + + await table.add([{ id: 4n, vector: [0.7, 0.8] }]); + expect(await table.version()).toBe(4); + + await table.checkout(tag1); + expect(await table.version()).toBe(3); + + await table.checkoutLatest(); + expect(await table.version()).toBe(4); + }); +}); + describe("when optimizing a dataset", () => { let tmpDir: tmp.DirResult; let table: Table; diff --git a/nodejs/lancedb/index.ts b/nodejs/lancedb/index.ts index 969bb396..f83b1d56 100644 --- a/nodejs/lancedb/index.ts +++ b/nodejs/lancedb/index.ts @@ -23,6 +23,8 @@ export { OptimizeStats, CompactionStats, RemovalStats, + Tags, + TagContents, } from "./native.js"; export { diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 54481656..ebe8a2e4 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -20,6 +20,7 @@ import { IndexConfig, IndexStatistics, OptimizeStats, + Tags, Table as _NativeTable, } from "./native"; import { @@ -374,7 +375,7 @@ export abstract class Table { * * Calling this method will set the table into time-travel mode. If you * wish to return to standard mode, call `checkoutLatest`. - * @param {number} version The version to checkout + * @param {number | string} version The version to checkout, could be version number or tag * @example * ```typescript * import * as lancedb from "@lancedb/lancedb" @@ -390,7 +391,8 @@ export abstract class Table { * console.log(await table.version()); // 2 * ``` */ - abstract checkout(version: number): Promise; + abstract checkout(version: number | string): Promise; + /** * Checkout the latest version of the table. _This is an in-place operation._ * @@ -404,6 +406,23 @@ export abstract class Table { */ abstract listVersions(): Promise; + /** + * Get a tags manager for this table. + * + * Tags allow you to label specific versions of a table with a human-readable name. + * The returned tags manager can be used to list, create, update, or delete tags. + * + * @returns {Tags} A tags manager for this table + * @example + * ```typescript + * const tagsManager = await table.tags(); + * await tagsManager.create("v1", 1); + * const tags = await tagsManager.list(); + * console.log(tags); // { "v1": { version: 1, manifestSize: ... } } + * ``` + */ + abstract tags(): Promise; + /** * Restore the table to the currently checked out version * @@ -699,8 +718,11 @@ export class LocalTable extends Table { return await this.inner.version(); } - async checkout(version: number): Promise { - await this.inner.checkout(version); + async checkout(version: number | string): Promise { + if (typeof version === "string") { + return this.inner.checkoutTag(version); + } + return this.inner.checkout(version); } async checkoutLatest(): Promise { @@ -719,6 +741,10 @@ export class LocalTable extends Table { await this.inner.restore(); } + async tags(): Promise { + return await this.inner.tags(); + } + async optimize(options?: Partial): Promise { let cleanupOlderThanMs; if ( diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index 66f480c8..2111f82c 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -249,6 +249,14 @@ impl Table { .default_error() } + #[napi(catch_unwind)] + pub async fn checkout_tag(&self, tag: String) -> napi::Result<()> { + self.inner_ref()? + .checkout_tag(tag.as_str()) + .await + .default_error() + } + #[napi(catch_unwind)] pub async fn checkout_latest(&self) -> napi::Result<()> { self.inner_ref()?.checkout_latest().await.default_error() @@ -281,6 +289,13 @@ impl Table { self.inner_ref()?.restore().await.default_error() } + #[napi(catch_unwind)] + pub async fn tags(&self) -> napi::Result { + Ok(Tags { + inner: self.inner_ref()?.clone(), + }) + } + #[napi(catch_unwind)] pub async fn optimize( &self, @@ -546,3 +561,78 @@ pub struct Version { pub timestamp: i64, pub metadata: HashMap, } + +#[napi] +pub struct TagContents { + pub version: i64, + pub manifest_size: i64, +} + +#[napi] +pub struct Tags { + inner: LanceDbTable, +} + +#[napi] +impl Tags { + #[napi] + pub async fn list(&self) -> napi::Result> { + let rust_tags = self.inner.tags().await.default_error()?; + let tag_list = rust_tags.as_ref().list().await.default_error()?; + let tag_contents = tag_list + .into_iter() + .map(|(k, v)| { + ( + k, + TagContents { + version: v.version as i64, + manifest_size: v.manifest_size as i64, + }, + ) + }) + .collect(); + + Ok(tag_contents) + } + + #[napi] + pub async fn get_version(&self, tag: String) -> napi::Result { + let rust_tags = self.inner.tags().await.default_error()?; + rust_tags + .as_ref() + .get_version(tag.as_str()) + .await + .map(|v| v as i64) + .default_error() + } + + #[napi] + pub async unsafe fn create(&mut self, tag: String, version: i64) -> napi::Result<()> { + let mut rust_tags = self.inner.tags().await.default_error()?; + rust_tags + .as_mut() + .create(tag.as_str(), version as u64) + .await + .default_error() + } + + #[napi] + pub async unsafe fn delete(&mut self, tag: String) -> napi::Result<()> { + let mut rust_tags = self.inner.tags().await.default_error()?; + rust_tags + .as_mut() + .delete(tag.as_str()) + .await + .default_error() + } + + #[napi] + pub async unsafe fn update(&mut self, tag: String, version: i64) -> napi::Result<()> { + let mut rust_tags = self.inner.tags().await.default_error()?; + rust_tags + .as_mut() + .update(tag.as_str(), version as u64) + .await + .default_error() + } +} diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index 3ac0d67e..ee744f60 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -1,5 +1,5 @@ from datetime import timedelta -from typing import Dict, List, Optional, Tuple, Any, Union, Literal +from typing import Dict, List, Optional, Tuple, Any, TypedDict, Union, Literal import pyarrow as pa @@ -47,7 +47,7 @@ class Table: ): ... async def list_versions(self) -> List[Dict[str, Any]]: ... async def version(self) -> int: ... - async def checkout(self, version: int): ... + async def checkout(self, version: Union[int, str]): ... async def checkout_latest(self): ... async def restore(self, version: Optional[int] = None): ... async def list_indices(self) -> list[IndexConfig]: ... @@ -61,9 +61,18 @@ class Table: cleanup_since_ms: Optional[int] = None, delete_unverified: Optional[bool] = None, ) -> OptimizeStats: ... + @property + def tags(self) -> Tags: ... def query(self) -> Query: ... def vector_search(self) -> VectorQuery: ... +class Tags: + async def list(self) -> Dict[str, Tag]: ... + async def get_version(self, tag: str) -> int: ... + async def create(self, tag: str, version: int): ... + async def delete(self, tag: str): ... + async def update(self, tag: str, version: int): ... + class IndexConfig: index_type: str columns: List[str] @@ -195,3 +204,7 @@ class RemovalStats: class OptimizeStats: compaction: CompactionStats prune: RemovalStats + +class Tag(TypedDict): + version: int + manifest_size: int diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index 81a1d754..16f7ea81 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -18,7 +18,7 @@ from lancedb.merge import LanceMergeInsertBuilder from lancedb.embeddings import EmbeddingFunctionRegistry from ..query import LanceVectorQueryBuilder, LanceQueryBuilder -from ..table import AsyncTable, IndexStatistics, Query, Table +from ..table import AsyncTable, IndexStatistics, Query, Table, Tags class RemoteTable(Table): @@ -54,6 +54,10 @@ class RemoteTable(Table): """Get the current version of the table""" return LOOP.run(self._table.version()) + @property + def tags(self) -> Tags: + return Tags(self._table) + @cached_property def embedding_functions(self) -> Dict[str, EmbeddingFunctionConfig]: """ @@ -81,7 +85,7 @@ class RemoteTable(Table): """to_pandas() is not yet supported on LanceDB cloud.""" return NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.") - def checkout(self, version: int): + def checkout(self, version: Union[int, str]): return LOOP.run(self._table.checkout(version)) def checkout_latest(self): diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index b8b09fd3..5ab268ee 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -77,6 +77,7 @@ if TYPE_CHECKING: OptimizeStats, CleanupStats, CompactionStats, + Tag, ) from .db import LanceDBConnection from .index import IndexConfig @@ -582,6 +583,35 @@ class Table(ABC): """ raise NotImplementedError + @property + @abstractmethod + def tags(self) -> Tags: + """Tag management for the table. + + Similar to Git, tags are a way to add metadata to a specific version of the + table. + + .. warning:: + + Tagged versions are exempted from the :py:meth:`cleanup_old_versions()` + process. + + To remove a version that has been tagged, you must first + :py:meth:`~Tags.delete` the associated tag. + + Examples + -------- + + .. code-block:: python + + table = db.open_table("my_table") + table.tags.create("v2-prod-20250203", 10) + + tags = table.tags.list() + + """ + raise NotImplementedError + @property @abstractmethod def embedding_functions(self) -> Dict[str, EmbeddingFunctionConfig]: @@ -1354,7 +1384,7 @@ class Table(ABC): """ @abstractmethod - def checkout(self, version: int): + def checkout(self, version: Union[int, str]): """ Checks out a specific version of the Table @@ -1369,6 +1399,12 @@ class Table(ABC): Any operation that modifies the table will fail while the table is in a checked out state. + Parameters + ---------- + version: int | str, + The version to check out. A version number (`int`) or a tag + (`str`) can be provided. + To return the table to a normal state use `[Self::checkout_latest]` """ @@ -1538,7 +1574,45 @@ class LanceTable(Table): """Get the current version of the table""" return LOOP.run(self._table.version()) - def checkout(self, version: int): + @property + def tags(self) -> Tags: + """Tag management for the table. + + Similar to Git, tags are a way to add metadata to a specific version of the + table. + + .. warning:: + + Tagged versions are exempted from the :py:meth:`cleanup_old_versions()` + process. + + To remove a version that has been tagged, you must first + :py:meth:`~Tags.delete` the associated tag. + + Returns + ------- + Tags + The tag manager for managing tags for the table. + + Examples + -------- + >>> import lancedb + >>> db = lancedb.connect("./.lancedb") + >>> table = db.create_table("my_table", + ... [{"vector": [1.1, 0.9], "type": "vector"}]) + >>> table.tags.create("v1", table.version) + >>> table.add([{"vector": [0.5, 0.2], "type": "vector"}]) + >>> tags = table.tags.list() + >>> print(tags["v1"]["version"]) + 1 + >>> table.checkout("v1") + >>> table.to_pandas() + vector type + 0 [1.1, 0.9] vector + """ + return Tags(self._table) + + def checkout(self, version: Union[int, str]): """Checkout a version of the table. This is an in-place operation. This allows viewing previous versions of the table. If you wish to @@ -1550,8 +1624,9 @@ class LanceTable(Table): Parameters ---------- - version : int - The version to checkout. + version: int | str, + The version to check out. A version number (`int`) or a tag + (`str`) can be provided. Examples -------- @@ -3746,7 +3821,7 @@ class AsyncTable: return versions - async def checkout(self, version: int): + async def checkout(self, version: int | str): """ Checks out a specific version of the Table @@ -3761,6 +3836,12 @@ class AsyncTable: Any operation that modifies the table will fail while the table is in a checked out state. + Parameters + ---------- + version: int | str, + The version to check out. A version number (`int`) or a tag + (`str`) can be provided. + To return the table to a normal state use `[Self::checkout_latest]` """ try: @@ -3798,6 +3879,24 @@ class AsyncTable: """ await self._inner.restore(version) + @property + def tags(self) -> AsyncTags: + """Tag management for the dataset. + + Similar to Git, tags are a way to add metadata to a specific version of the + dataset. + + .. warning:: + + Tagged versions are exempted from the + :py:meth:`optimize(cleanup_older_than)` process. + + To remove a version that has been tagged, you must first + :py:meth:`~Tags.delete` the associated tag. + + """ + return AsyncTags(self._inner) + async def optimize( self, *, @@ -3967,3 +4066,141 @@ class IndexStatistics: # a dictionary instead of a class. def __getitem__(self, key): return getattr(self, key) + + +class Tags: + """ + Table tag manager. + """ + + def __init__(self, table): + self._table = table + + def list(self) -> Dict[str, Tag]: + """ + List all table tags. + + Returns + ------- + dict[str, Tag] + A dictionary mapping tag names to version numbers. + """ + return LOOP.run(self._table.tags.list()) + + def get_version(self, tag: str) -> int: + """ + Get the version of a tag. + + Parameters + ---------- + tag: str, + The name of the tag to get the version for. + """ + return LOOP.run(self._table.tags.get_version(tag)) + + def create(self, tag: str, version: int) -> None: + """ + Create a tag for a given table version. + + Parameters + ---------- + tag: str, + The name of the tag to create. This name must be unique among all tag + names for the table. + version: int, + The table version to tag. + """ + LOOP.run(self._table.tags.create(tag, version)) + + def delete(self, tag: str) -> None: + """ + Delete tag from the table. + + Parameters + ---------- + tag: str, + The name of the tag to delete. + """ + LOOP.run(self._table.tags.delete(tag)) + + def update(self, tag: str, version: int) -> None: + """ + Update tag to a new version. + + Parameters + ---------- + tag: str, + The name of the tag to update. + version: int, + The new table version to tag. + """ + LOOP.run(self._table.tags.update(tag, version)) + + +class AsyncTags: + """ + Async table tag manager. + """ + + def __init__(self, table): + self._table = table + + async def list(self) -> Dict[str, Tag]: + """ + List all table tags. + + Returns + ------- + dict[str, Tag] + A dictionary mapping tag names to version numbers. + """ + return await self._table.tags.list() + + async def get_version(self, tag: str) -> int: + """ + Get the version of a tag. + + Parameters + ---------- + tag: str, + The name of the tag to get the version for. + """ + return await self._table.tags.get_version(tag) + + async def create(self, tag: str, version: int) -> None: + """ + Create a tag for a given table version. + + Parameters + ---------- + tag: str, + The name of the tag to create. This name must be unique among all tag + names for the table. + version: int, + The table version to tag. + """ + await self._table.tags.create(tag, version) + + async def delete(self, tag: str) -> None: + """ + Delete tag from the table. + + Parameters + ---------- + tag: str, + The name of the tag to delete. + """ + await self._table.tags.delete(tag) + + async def update(self, tag: str, version: int) -> None: + """ + Update tag to a new version. + + Parameters + ---------- + tag: str, + The name of the tag to update. + version: int, + The new table version to tag. + """ + await self._table.tags.update(tag, version) diff --git a/python/python/tests/test_table.py b/python/python/tests/test_table.py index 6624de19..db7e5f61 100644 --- a/python/python/tests/test_table.py +++ b/python/python/tests/test_table.py @@ -529,6 +529,113 @@ def test_versioning(mem_db: DBConnection): assert len(table) == 2 +def test_tags(mem_db: DBConnection): + table = mem_db.create_table( + "test", + data=[ + {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}, + {"vector": [5.9, 26.5], "item": "bar", "price": 20.0}, + ], + ) + + table.tags.create("tag1", 1) + tags = table.tags.list() + assert "tag1" in tags + assert tags["tag1"]["version"] == 1 + + table.add( + data=[ + {"vector": [10.0, 11.0], "item": "baz", "price": 30.0}, + ], + ) + + table.tags.create("tag2", 2) + tags = table.tags.list() + assert "tag1" in tags + assert "tag2" in tags + assert tags["tag1"]["version"] == 1 + assert tags["tag2"]["version"] == 2 + + table.tags.delete("tag2") + table.tags.update("tag1", 2) + tags = table.tags.list() + assert "tag1" in tags + assert tags["tag1"]["version"] == 2 + + table.tags.update("tag1", 1) + tags = table.tags.list() + assert "tag1" in tags + assert tags["tag1"]["version"] == 1 + + table.checkout("tag1") + assert table.version == 1 + assert table.count_rows() == 2 + table.tags.create("tag2", 2) + table.checkout("tag2") + assert table.version == 2 + assert table.count_rows() == 3 + table.checkout_latest() + table.add( + data=[ + {"vector": [12.0, 13.0], "item": "baz", "price": 40.0}, + ], + ) + + +@pytest.mark.asyncio +async def test_async_tags(mem_db_async: AsyncConnection): + table = await mem_db_async.create_table( + "test", + data=[ + {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}, + {"vector": [5.9, 26.5], "item": "bar", "price": 20.0}, + ], + ) + + await table.tags.create("tag1", 1) + tags = await table.tags.list() + assert "tag1" in tags + assert tags["tag1"]["version"] == 1 + + await table.add( + data=[ + {"vector": [10.0, 11.0], "item": "baz", "price": 30.0}, + ], + ) + + await table.tags.create("tag2", 2) + tags = await table.tags.list() + assert "tag1" in tags + assert "tag2" in tags + assert tags["tag1"]["version"] == 1 + assert tags["tag2"]["version"] == 2 + + await table.tags.delete("tag2") + await table.tags.update("tag1", 2) + tags = await table.tags.list() + assert "tag1" in tags + assert tags["tag1"]["version"] == 2 + + await table.tags.update("tag1", 1) + tags = await table.tags.list() + assert "tag1" in tags + assert tags["tag1"]["version"] == 1 + + await table.checkout("tag1") + assert await table.version() == 1 + assert await table.count_rows() == 2 + await table.tags.create("tag2", 2) + await table.checkout("tag2") + assert await table.version() == 2 + assert await table.count_rows() == 3 + await table.checkout_latest() + await table.add( + data=[ + {"vector": [12.0, 13.0], "item": "baz", "price": 40.0}, + ], + ) + + @patch("lancedb.table.AsyncTable.create_index") def test_create_index_method(mock_create_index, mem_db: DBConnection): table = mem_db.create_table( diff --git a/python/src/table.rs b/python/src/table.rs index 7575f0d4..431c54cc 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -2,6 +2,11 @@ // SPDX-FileCopyrightText: Copyright The LanceDB Authors use std::{collections::HashMap, sync::Arc}; +use crate::{ + error::PythonErrorExt, + index::{extract_index_params, IndexConfig}, + query::Query, +}; use arrow::{ datatypes::{DataType, Schema}, ffi_stream::ArrowArrayStreamReader, @@ -12,19 +17,13 @@ use lancedb::table::{ Table as LanceDbTable, }; use pyo3::{ - exceptions::{PyKeyError, PyRuntimeError, PyValueError}, + exceptions::{PyIOError, PyKeyError, PyRuntimeError, PyValueError}, pyclass, pymethods, - types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods}, - Bound, FromPyObject, PyAny, PyRef, PyResult, Python, + types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods, PyInt, PyString}, + Bound, FromPyObject, PyAny, PyObject, PyRef, PyResult, Python, }; use pyo3_async_runtimes::tokio::future_into_py; -use crate::{ - error::PythonErrorExt, - index::{extract_index_params, IndexConfig}, - query::Query, -}; - /// Statistics about a compaction operation. #[pyclass(get_all)] #[derive(Clone, Debug)] @@ -322,10 +321,26 @@ impl Table { }) } - pub fn checkout(self_: PyRef<'_, Self>, version: u64) -> PyResult> { + pub fn checkout(self_: PyRef<'_, Self>, version: PyObject) -> PyResult> { let inner = self_.inner_ref()?.clone(); - future_into_py(self_.py(), async move { - inner.checkout(version).await.infer_error() + let py = self_.py(); + let (is_int, int_value, string_value) = if let Ok(i) = version.downcast_bound::(py) { + let num: u64 = i.extract()?; + (true, num, String::new()) + } else if let Ok(s) = version.downcast_bound::(py) { + let str_value = s.to_string(); + (false, 0, str_value) + } else { + return Err(PyIOError::new_err( + "version must be an integer or a string.", + )); + }; + future_into_py(py, async move { + if is_int { + inner.checkout(int_value).await.infer_error() + } else { + inner.checkout_tag(&string_value).await.infer_error() + } }) } @@ -352,6 +367,11 @@ impl Table { Query::new(self.inner_ref().unwrap().query()) } + #[getter] + pub fn tags(&self) -> PyResult { + Ok(Tags::new(self.inner_ref()?.clone())) + } + /// Optimize the on-disk data by compacting and pruning old data, for better performance. #[pyo3(signature = (cleanup_since_ms=None, delete_unverified=None, retrain=None))] pub fn optimize( @@ -586,3 +606,72 @@ pub struct MergeInsertParams { when_not_matched_by_source_delete: bool, when_not_matched_by_source_condition: Option, } + +#[pyclass] +pub struct Tags { + inner: LanceDbTable, +} + +impl Tags { + pub fn new(table: LanceDbTable) -> Self { + Self { inner: table } + } +} + +#[pymethods] +impl Tags { + pub fn list(self_: PyRef<'_, Self>) -> PyResult> { + let inner = self_.inner.clone(); + future_into_py(self_.py(), async move { + let tags = inner.tags().await.infer_error()?; + let res = tags.list().await.infer_error()?; + + Python::with_gil(|py| { + let py_dict = PyDict::new(py); + for (key, contents) in res { + let value_dict = PyDict::new(py); + value_dict.set_item("version", contents.version)?; + value_dict.set_item("manifest_size", contents.manifest_size)?; + py_dict.set_item(key, value_dict)?; + } + Ok(py_dict.unbind()) + }) + }) + } + + pub fn get_version(self_: PyRef<'_, Self>, tag: String) -> PyResult> { + let inner = self_.inner.clone(); + future_into_py(self_.py(), async move { + let tags = inner.tags().await.infer_error()?; + let res = tags.get_version(tag.as_str()).await.infer_error()?; + Ok(res) + }) + } + + pub fn create(self_: PyRef, tag: String, version: u64) -> PyResult> { + let inner = self_.inner.clone(); + future_into_py(self_.py(), async move { + let mut tags = inner.tags().await.infer_error()?; + tags.create(tag.as_str(), version).await.infer_error()?; + Ok(()) + }) + } + + pub fn delete(self_: PyRef, tag: String) -> PyResult> { + let inner = self_.inner.clone(); + future_into_py(self_.py(), async move { + let mut tags = inner.tags().await.infer_error()?; + tags.delete(tag.as_str()).await.infer_error()?; + Ok(()) + }) + } + + pub fn update(self_: PyRef, tag: String, version: u64) -> PyResult> { + let inner = self_.inner.clone(); + future_into_py(self_.py(), async move { + let mut tags = inner.tags().await.infer_error()?; + tags.update(tag.as_str(), version).await.infer_error()?; + Ok(()) + }) + } +} diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index ba7662e5..0b3a6dd0 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -4,6 +4,7 @@ use crate::index::Index; use crate::index::IndexStatistics; use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest}; +use crate::table::Tags; use crate::table::{AddDataMode, AnyQuery, Filter}; use crate::utils::{supported_btree_data_type, supported_vector_data_type}; use crate::{DistanceType, Error, Table}; @@ -18,11 +19,13 @@ use futures::TryStreamExt; use http::header::CONTENT_TYPE; use http::{HeaderName, StatusCode}; use lance::arrow::json::{JsonDataType, JsonSchema}; +use lance::dataset::refs::TagContents; use lance::dataset::scanner::DatasetRecordBatchStream; use lance::dataset::{ColumnAlteration, NewColumnTransform, Version}; use lance_datafusion::exec::{execute_plan, OneShotExec}; use reqwest::{RequestBuilder, Response}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::io::Cursor; use std::pin::Pin; use std::sync::{Arc, Mutex}; @@ -47,6 +50,137 @@ use crate::{ const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms"); +pub struct RemoteTags<'a, S: HttpSend = Sender> { + inner: &'a RemoteTable, +} + +#[async_trait] +impl Tags for RemoteTags<'_, S> { + async fn list(&self) -> Result> { + let request = self + .inner + .client + .post(&format!("/v1/table/{}/tags/list/", self.inner.name)); + let (request_id, response) = self.inner.send(request, true).await?; + let response = self + .inner + .check_table_response(&request_id, response) + .await?; + + match response.text().await { + Ok(body) => { + // Explicitly tell serde_json what type we want to deserialize into + let tags_map: HashMap = + serde_json::from_str(&body).map_err(|e| Error::Http { + source: format!("Failed to parse tags list: {}", e).into(), + request_id, + status_code: None, + })?; + + Ok(tags_map) + } + Err(err) => { + let status_code = err.status(); + Err(Error::Http { + source: Box::new(err), + request_id, + status_code, + }) + } + } + } + + async fn get_version(&self, tag: &str) -> Result { + let request = self + .inner + .client + .post(&format!("/v1/table/{}/tags/version/", self.inner.name)) + .json(&serde_json::json!({ "tag": tag })); + + let (request_id, response) = self.inner.send(request, true).await?; + let response = self + .inner + .check_table_response(&request_id, response) + .await?; + + match response.text().await { + Ok(body) => { + let value: serde_json::Value = + serde_json::from_str(&body).map_err(|e| Error::Http { + source: format!("Failed to parse tag version: {}", e).into(), + request_id: request_id.clone(), + status_code: None, + })?; + + value + .get("version") + .and_then(|v| v.as_u64()) + .ok_or_else(|| Error::Http { + source: format!("Invalid tag version response: {}", body).into(), + request_id, + status_code: None, + }) + } + Err(err) => { + let status_code = err.status(); + Err(Error::Http { + source: Box::new(err), + request_id, + status_code, + }) + } + } + } + + async fn create(&mut self, tag: &str, version: u64) -> Result<()> { + let request = self + .inner + .client + .post(&format!("/v1/table/{}/tags/create/", self.inner.name)) + .json(&serde_json::json!({ + "tag": tag, + "version": version + })); + + let (request_id, response) = self.inner.send(request, true).await?; + self.inner + .check_table_response(&request_id, response) + .await?; + Ok(()) + } + + async fn delete(&mut self, tag: &str) -> Result<()> { + let request = self + .inner + .client + .post(&format!("/v1/table/{}/tags/delete/", self.inner.name)) + .json(&serde_json::json!({ "tag": tag })); + + let (request_id, response) = self.inner.send(request, true).await?; + self.inner + .check_table_response(&request_id, response) + .await?; + Ok(()) + } + + async fn update(&mut self, tag: &str, version: u64) -> Result<()> { + let request = self + .inner + .client + .post(&format!("/v1/table/{}/tags/update/", self.inner.name)) + .json(&serde_json::json!({ + "tag": tag, + "version": version + })); + + let (request_id, response) = self.inner.send(request, true).await?; + self.inner + .check_table_response(&request_id, response) + .await?; + Ok(()) + } +} + #[derive(Debug)] pub struct RemoteTable { #[allow(dead_code)] @@ -905,6 +1039,16 @@ impl BaseTable for RemoteTable { Ok(()) } + async fn tags(&self) -> Result> { + Ok(Box::new(RemoteTags { inner: self })) + } + async fn checkout_tag(&self, tag: &str) -> Result<()> { + let tags = self.tags().await?; + let version = tags.get_version(tag).await?; + let mut write_guard = self.version.write().await; + *write_guard = Some(version); + Ok(()) + } async fn optimize(&self, _action: OptimizeAction) -> Result { self.check_mutable().await?; Err(Error::NotSupported { diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 29f0edff..4763b526 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -81,6 +81,7 @@ pub mod merge; use crate::index::waiter::wait_for_index; pub use chrono::Duration; pub use lance::dataset::optimize::CompactionOptions; +pub use lance::dataset::refs::{TagContents, Tags as LanceTags}; pub use lance::dataset::scanner::DatasetRecordBatchStream; pub use lance_index::optimize::OptimizeOptions; @@ -401,6 +402,24 @@ pub enum AnyQuery { VectorQuery(VectorQueryRequest), } +#[async_trait] +pub trait Tags: Send + Sync { + /// List the tags of the table. + async fn list(&self) -> Result>; + + /// Get the version of the table referenced by a tag. + async fn get_version(&self, tag: &str) -> Result; + + /// Create a new tag for the given version of the table. + async fn create(&mut self, tag: &str, version: u64) -> Result<()>; + + /// Delete a tag from the table. + async fn delete(&mut self, tag: &str) -> Result<()>; + + /// Update an existing tag to point to a new version of the table. + async fn update(&mut self, tag: &str, version: u64) -> Result<()>; +} + /// A trait for anything "table-like". This is used for both native tables (which target /// Lance datasets) and remote tables (which target LanceDB cloud) /// @@ -466,6 +485,8 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { params: MergeInsertBuilder, new_data: Box, ) -> Result<()>; + /// Gets the table tag manager. + async fn tags(&self) -> Result>; /// Optimize the dataset. async fn optimize(&self, action: OptimizeAction) -> Result; /// Add columns to the table. @@ -482,6 +503,9 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { async fn version(&self) -> Result; /// Checkout a specific version of the table. async fn checkout(&self, version: u64) -> Result<()>; + /// Checkout a table version referenced by a tag. + /// Tags provide a human-readable way to reference specific versions of the table. + async fn checkout_tag(&self, tag: &str) -> Result<()>; /// Checkout the latest version of the table. async fn checkout_latest(&self) -> Result<()>; /// Restore the table to the currently checked out version. @@ -1058,6 +1082,24 @@ impl Table { self.inner.checkout(version).await } + /// Checks out a specific version of the Table by tag + /// + /// Any read operation on the table will now access the data at the version referenced by the tag. + /// As a consequence, calling this method will disable any read consistency interval + /// that was previously set. + /// + /// This is a read-only operation that turns the table into a sort of "view" + /// or "detached head". Other table instances will not be affected. To make the change + /// permanent you can use the `[Self::restore]` method. + /// + /// Any operation that modifies the table will fail while the table is in a checked + /// out state. + /// + /// To return the table to a normal state use `[Self::checkout_latest]` + pub async fn checkout_tag(&self, tag: &str) -> Result<()> { + self.inner.checkout_tag(tag).await + } + /// Ensures the table is pointing at the latest version /// /// This can be used to manually update a table when the read_consistency_interval is None @@ -1144,6 +1186,11 @@ impl Table { self.inner.wait_for_index(index_names, timeout).await } + /// Get the tags manager. + pub async fn tags(&self) -> Result> { + self.inner.tags().await + } + // Take many execution plans and map them into a single plan that adds // a query_index column and unions them. pub(crate) fn multi_vector_plan( @@ -1196,6 +1243,35 @@ impl Table { } } +pub struct NativeTags { + inner: LanceTags, +} +#[async_trait] +impl Tags for NativeTags { + async fn list(&self) -> Result> { + Ok(self.inner.list().await?) + } + + async fn get_version(&self, tag: &str) -> Result { + Ok(self.inner.get_version(tag).await?) + } + + async fn create(&mut self, tag: &str, version: u64) -> Result<()> { + self.inner.create(tag, version).await?; + Ok(()) + } + + async fn delete(&mut self, tag: &str) -> Result<()> { + self.inner.delete(tag).await?; + Ok(()) + } + + async fn update(&mut self, tag: &str, version: u64) -> Result<()> { + self.inner.update(tag, version).await?; + Ok(()) + } +} + impl From for Table { fn from(table: NativeTable) -> Self { Self::new(Arc::new(table)) @@ -1940,6 +2016,10 @@ impl BaseTable for NativeTable { self.dataset.as_time_travel(version).await } + async fn checkout_tag(&self, tag: &str) -> Result<()> { + self.dataset.as_time_travel(tag).await + } + async fn checkout_latest(&self) -> Result<()> { self.dataset .as_latest(self.read_consistency_interval) @@ -2315,6 +2395,14 @@ impl BaseTable for NativeTable { Ok(()) } + async fn tags(&self) -> Result> { + let dataset = self.dataset.get().await?; + + Ok(Box::new(NativeTags { + inner: dataset.tags.clone(), + })) + } + async fn optimize(&self, action: OptimizeAction) -> Result { let mut stats = OptimizeStats { compaction: None, @@ -3081,6 +3169,60 @@ mod tests { ) } + #[tokio::test] + async fn test_tags() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let conn = ConnectBuilder::new(uri) + .read_consistency_interval(Duration::from_secs(0)) + .execute() + .await + .unwrap(); + let table = conn + .create_table("my_table", some_sample_data()) + .execute() + .await + .unwrap(); + assert_eq!(table.version().await.unwrap(), 1); + table.add(some_sample_data()).execute().await.unwrap(); + assert_eq!(table.version().await.unwrap(), 2); + let mut tags_manager = table.tags().await.unwrap(); + let tags = tags_manager.list().await.unwrap(); + assert!(tags.is_empty(), "Tags should be empty initially"); + let tag1 = "tag1"; + tags_manager.create(tag1, 1).await.unwrap(); + assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 1); + let tags = tags_manager.list().await.unwrap(); + assert_eq!(tags.len(), 1); + assert!(tags.contains_key(tag1)); + assert_eq!(tags.get(tag1).unwrap().version, 1); + tags_manager.create("tag2", 2).await.unwrap(); + assert_eq!(tags_manager.get_version("tag2").await.unwrap(), 2); + let tags = tags_manager.list().await.unwrap(); + assert_eq!(tags.len(), 2); + assert!(tags.contains_key(tag1)); + assert_eq!(tags.get(tag1).unwrap().version, 1); + assert!(tags.contains_key("tag2")); + assert_eq!(tags.get("tag2").unwrap().version, 2); + // Test update and delete + table.add(some_sample_data()).execute().await.unwrap(); + tags_manager.update(tag1, 3).await.unwrap(); + assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 3); + tags_manager.delete("tag2").await.unwrap(); + let tags = tags_manager.list().await.unwrap(); + assert_eq!(tags.len(), 1); + assert!(tags.contains_key(tag1)); + assert_eq!(tags.get(tag1).unwrap().version, 3); + // Test checkout tag + table.add(some_sample_data()).execute().await.unwrap(); + assert_eq!(table.version().await.unwrap(), 4); + table.checkout_tag(tag1).await.unwrap(); + assert_eq!(table.version().await.unwrap(), 3); + table.checkout_latest().await.unwrap(); + assert_eq!(table.version().await.unwrap(), 4); + } + #[tokio::test] async fn test_create_index() { use arrow_array::RecordBatch; diff --git a/rust/lancedb/src/table/dataset.rs b/rust/lancedb/src/table/dataset.rs index eb379ab1..9d013718 100644 --- a/rust/lancedb/src/table/dataset.rs +++ b/rust/lancedb/src/table/dataset.rs @@ -7,7 +7,7 @@ use std::{ time::{self, Duration, Instant}, }; -use lance::Dataset; +use lance::{dataset::refs, Dataset}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use crate::error::Result; @@ -83,19 +83,32 @@ impl DatasetRef { } } - async fn as_time_travel(&mut self, target_version: u64) -> Result<()> { + async fn as_time_travel(&mut self, target_version: impl Into) -> Result<()> { + let target_ref = target_version.into(); + match self { Self::Latest { dataset, .. } => { + let new_dataset = dataset.checkout_version(target_ref.clone()).await?; + let version_value = new_dataset.version().version; + *self = Self::TimeTravel { - dataset: dataset.checkout_version(target_version).await?, - version: target_version, + dataset: new_dataset, + version: version_value, }; } Self::TimeTravel { dataset, version } => { - if *version != target_version { + let should_checkout = match &target_ref { + refs::Ref::Version(target_ver) => version != target_ver, + refs::Ref::Tag(_) => true, // Always checkout for tags + }; + + if should_checkout { + let new_dataset = dataset.checkout_version(target_ref).await?; + let version_value = new_dataset.version().version; + *self = Self::TimeTravel { - dataset: dataset.checkout_version(target_version).await?, - version: target_version, + dataset: new_dataset, + version: version_value, }; } } @@ -175,7 +188,7 @@ impl DatasetConsistencyWrapper { write_guard.as_latest(read_consistency_interval).await } - pub async fn as_time_travel(&self, target_version: u64) -> Result<()> { + pub async fn as_time_travel(&self, target_version: impl Into) -> Result<()> { self.0.write().await.as_time_travel(target_version).await }