Compare commits

...

11 Commits

Author SHA1 Message Date
Jack Ye
c42848ae78 feat: add native OAuth/OIDC authentication support
Add OAuthConfig and OAuthHeaderProvider to the Rust core with support
for five OAuth flows: ClientCredentials, AuthorizationCodePKCE,
DeviceCode, AzureManagedIdentity, and WorkloadIdentity. Token
acquisition and auto-refresh happen entirely in Rust.

Python and TypeScript expose OAuthConfig as a plain config object that
maps to the Rust header provider via FFI — no dynamic callbacks cross
the language boundary.

ConnectBuilder gains an oauth_config() method that replaces the API key
requirement when OAuth is configured.
2026-05-18 22:18:30 -07:00
Lance Release
53c2164b84 Bump version: 0.29.0 → 0.29.1-beta.0 2026-05-18 22:07:52 +00:00
Lance Release
6286ee8192 Bump version: 0.32.0 → 0.32.1-beta.0 2026-05-18 22:06:40 +00:00
LanceDB Robot
af8ca2ad5e chore: update lance dependency to v7.0.0-beta.13 (#3399)
## Summary
- Bump Lance Rust workspace dependencies to `v7.0.0-beta.13` using
`ci/set_lance_version.py`.
- Update the Java `lance-core` dependency property to `7.0.0-beta.13`.
- Triggering tag:
https://github.com/lance-format/lance/releases/tag/v7.0.0-beta.13

## Verification
- `cargo clippy --workspace --tests --all-features -- -D warnings`
- `cargo fmt --all`
2026-05-18 13:19:32 -07:00
Drew Gallardo
aac6c62459 feat(python): add public take_offsets method on Permutation (#3375)
Closes #3243.

This PR exposes a new public api `Permutation.take_offsets(offsets:
list[int])`, since users initially had to call __getitems__ directly to
batch-fetch rows by position.

Currently, the name matches the existing `Table.take_offsets` pattern,
and now the dunder `__getitem__` and `__getitems__` now delegate to it.

Also, fixes a parse error when `PermutationReader::take_offsets` gets an
empty list. Now returns an empty `RecordBatch` with the correct schema
instead. Bundled this because without the fix the new public API blows
up on a perfectly reasonable input.

`__getitems__` is preserved since PyTorch's batched DataLoader requires
it.

### Testing

- Added 3 new Rust tests for empty offsets including permutation table
with Select::All, Select::Columns, and identity path
- Added 3 new Python tests for the public API including a happy case,
and empty input on both identity and permutation

clippy, format, check all clean!

cc: @westonpace
2026-05-18 09:35:56 -07:00
Weston Pace
8df2fff75f ci: bump version after 0.29 release (#3378)
The 0.29 release happened on a branch because the main line had already
moved past the 6.0.0 stable lance release. As a result the version bump
commits ended up on the branch. This merges those commits back into
main.

---------

Co-authored-by: Lance Release <lance-dev@lancedb.com>
2026-05-18 05:34:33 -07:00
Heng Ge
0d30b31998 feat: support setting LSM write spec for a table (#3396)
## Summary

Split out from #3354

Adds `LsmWriteSpec` and `Table::set_lsm_write_spec` /
`unset_lsm_write_spec` to
install and clear the spec that selects Lance's MemWAL LSM-style write
path for
`merge_insert`.

`LsmWriteSpec` offers three sharding strategies, all built on Lance's
`InitializeMemWalBuilder`:

- `LsmWriteSpec::bucket(column, num_buckets)` — hash-bucket sharding by
the
  single-column unenforced primary key.
- `LsmWriteSpec::identity(column)` — identity sharding by the raw value
of a
  scalar column.
- `LsmWriteSpec::unsharded()` — a single MemWAL shard.

Each can be refined with `with_maintained_indexes(...)` (indexes the
MemWAL
keeps up to date as rows are appended) and
`with_writer_config_defaults(...)`
(default `ShardWriter` configuration recorded in the MemWAL index, so
every
writer starts from the same defaults). All variants require the table to
have
an unenforced primary key.

- `set_lsm_write_spec` installs the spec by initializing the MemWAL
index;
`unset_lsm_write_spec` removes it (dropping the MemWAL index), reverting
to
  the standard `merge_insert` path. `unset` is idempotent.
- Bindings: Python (`LsmWriteSpec.bucket` / `.identity` / `.unsharded`,
  `set_lsm_write_spec` / `unset_lsm_write_spec`) and TypeScript
  (`setLsmWriteSpec` with `specType` `"bucket"` / `"identity"` /
  `"unsharded"`). `RemoteTable` returns `NotSupported`.

The actual `merge_insert` LSM dispatch and `ShardWriter` write path are
a
follow-up — this PR only installs and clears the spec.
2026-05-18 00:11:33 -07:00
Heng Ge
6a431ff0a0 feat: support setting unenforced primary key (#3394)
## Summary

Adds `Table::set_unenforced_primary_key` — records a single column as
the
table's unenforced primary key in Lance schema field metadata.
"Unenforced"
means LanceDB does not check uniqueness on write; the key is metadata
that
`merge_insert` consumes.

- Single-column only; the column must exist and have a supported dtype
(Int32, Int64, Utf8, LargeUtf8, Binary, LargeBinary, FixedSizeBinary).
The
API accepts an iterable for binding ergonomics but requires exactly one
  column — compound keys are rejected.
- The primary key is immutable: calling this on a table that already has
an
unenforced primary key is rejected. Concurrent writers racing to set the
key
  fail at commit time rather than silently overriding it.
- `RemoteTable` returns `NotSupported`.
- Bindings: Python (`AsyncTable`, `LanceTable`, `RemoteTable`) and
TypeScript
  (`Table.setUnenforcedPrimaryKey`).

## Context

Split out from #3354 per review feedback, so the unenforced primary key
and the
`merge_insert` sharding spec land as separate reviewable PRs.

No Lance dependency bump — `main` is already on v7.0.0-beta.10, which
includes
the field-metadata round-trip fix the API relies on. Enforcing
primary-key
immutability at the Lance commit layer (so the cross-column concurrent
race is
also rejected) is a companion Lance change: lance-format/lance#6810.
2026-05-16 23:12:55 -07:00
Xin Sun
ab2c5adf5e feat(nodejs): add order_by method to Query (#3123) 2026-05-16 22:49:08 -07:00
LanceDB Robot
f02c4cad90 chore: update lance dependency to v7.0.0-beta.10 (#3393)
## Summary
- Update Lance Rust dependencies to `7.0.0-beta.10` using
`ci/set_lance_version.py`.
- Update Java `lance-core.version` to `7.0.0-beta.10`.
- Refresh `Cargo.lock` for the new Lance tag.

Triggering tag:
https://github.com/lance-format/lance/releases/tag/v7.0.0-beta.10

## Verification
- `cargo clippy --workspace --tests --all-features -- -D warnings`
- `cargo fmt --all`
2026-05-16 11:58:45 -05:00
LanceDB Robot
7b74c3dd91 chore: update lance dependency to v7.0.0-beta.9 (#3391)
## Summary
- Update Lance Rust workspace dependencies from v7.0.0-beta.7 to
v7.0.0-beta.9 using `ci/set_lance_version.py`.
- Update the Java `lance-core.version` property to `7.0.0-beta.9`.
- Refresh `Cargo.lock` for the Lance dependency bump.

## Verification
- `cargo clippy --workspace --tests --all-features -- -D warnings`
- `cargo fmt --all`

Triggering Lance tag:
https://github.com/lance-format/lance/releases/tag/v7.0.0-beta.9

---------

Co-authored-by: Daniel Rammer <hamersaw@protonmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 12:56:29 -05:00
70 changed files with 15265 additions and 392 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.28.0-beta.11"
current_version = "0.29.1-beta.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

582
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.28.0-beta.11</version>
<version>0.29.1-beta.0</version>
</dependency>
```

View File

@@ -343,6 +343,30 @@ This is useful for pagination.
***
### orderBy()
```ts
orderBy(ordering): this
```
Sort the results by the specified column(s).
#### Parameters
* **ordering**: [`ColumnOrdering`](../interfaces/ColumnOrdering.md) \| [`ColumnOrdering`](../interfaces/ColumnOrdering.md)[]
#### Returns
`this`
This query builder.
#### Inherited from
`StandardQueryBase.orderBy`
***
### outputSchema()
```ts

View File

@@ -690,6 +690,74 @@ of the given query
***
### setLsmWriteSpec()
```ts
abstract setLsmWriteSpec(spec): Promise<void>
```
Install an [LsmWriteSpec](../interfaces/LsmWriteSpec.md) on this table, selecting Lance's MemWAL
LSM-style write path for future `mergeInsert` calls.
`LsmWriteSpec` chooses one of three sharding strategies via `specType`:
- `"bucket"` — hash-bucket writes by the single-column unenforced primary
key (`column` and `numBuckets` required).
- `"identity"` — shard by the raw value of a scalar `column`.
- `"unsharded"` — route every write to a single shard.
All variants require the table to have an unenforced primary key
([Table#setUnenforcedPrimaryKey](Table.md#setunenforcedprimarykey)); bucket sharding additionally
requires it to be the single column being bucketed.
#### Parameters
* **spec**: [`LsmWriteSpec`](../interfaces/LsmWriteSpec.md)
The sharding spec to install.
#### Returns
`Promise`&lt;`void`&gt;
#### Example
```ts
await table.setUnenforcedPrimaryKey("id");
await table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 16,
maintainedIndexes: ["id_idx"],
});
```
***
### setUnenforcedPrimaryKey()
```ts
abstract setUnenforcedPrimaryKey(columns): Promise<void>
```
Set the unenforced primary key for this table to a single column.
"Unenforced" means LanceDB does not check uniqueness on writes; the
column is recorded in the schema as the primary key for use by features
such as `merge_insert`. Only single-column primary keys are supported,
and the key cannot be changed once set.
#### Parameters
* **columns**: `string` \| `string`[]
The primary key column. A one-element
array is also accepted; passing more than one column is rejected.
#### Returns
`Promise`&lt;`void`&gt;
***
### stats()
```ts
@@ -793,6 +861,23 @@ Return the table as an arrow table
***
### unsetLsmWriteSpec()
```ts
abstract unsetLsmWriteSpec(): Promise<void>
```
Remove the [LsmWriteSpec](../interfaces/LsmWriteSpec.md) from this table, reverting to the standard
`mergeInsert` write path.
Errors if no spec is currently set.
#### Returns
`Promise`&lt;`void`&gt;
***
### update()
#### update(opts)

View File

@@ -498,6 +498,30 @@ This is useful for pagination.
***
### orderBy()
```ts
orderBy(ordering): this
```
Sort the results by the specified column(s).
#### Parameters
* **ordering**: [`ColumnOrdering`](../interfaces/ColumnOrdering.md) \| [`ColumnOrdering`](../interfaces/ColumnOrdering.md)[]
#### Returns
`this`
This query builder.
#### Inherited from
`StandardQueryBase.orderBy`
***
### outputSchema()
```ts

View File

@@ -12,6 +12,7 @@
## Enumerations
- [FullTextQueryType](enumerations/FullTextQueryType.md)
- [OAuthFlowType](enumerations/OAuthFlowType.md)
- [Occur](enumerations/Occur.md)
- [Operator](enumerations/Operator.md)
@@ -51,6 +52,7 @@
- [AlterColumnsResult](interfaces/AlterColumnsResult.md)
- [ClientConfig](interfaces/ClientConfig.md)
- [ColumnAlteration](interfaces/ColumnAlteration.md)
- [ColumnOrdering](interfaces/ColumnOrdering.md)
- [CompactionStats](interfaces/CompactionStats.md)
- [ConnectNamespaceOptions](interfaces/ConnectNamespaceOptions.md)
- [ConnectionOptions](interfaces/ConnectionOptions.md)
@@ -79,7 +81,10 @@
- [IvfRqOptions](interfaces/IvfRqOptions.md)
- [ListNamespacesOptions](interfaces/ListNamespacesOptions.md)
- [ListNamespacesResponse](interfaces/ListNamespacesResponse.md)
- [LsmWriteSpec](interfaces/LsmWriteSpec.md)
- [MergeResult](interfaces/MergeResult.md)
- [NativeOAuthConfig](interfaces/NativeOAuthConfig.md)
- [OAuthConfig](interfaces/OAuthConfig.md)
- [OpenTableOptions](interfaces/OpenTableOptions.md)
- [OptimizeOptions](interfaces/OptimizeOptions.md)
- [OptimizeStats](interfaces/OptimizeStats.md)

View File

@@ -0,0 +1,31 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / ColumnOrdering
# Interface: ColumnOrdering
## Properties
### ascending?
```ts
optional ascending: boolean;
```
***
### columnName
```ts
columnName: string;
```
***
### nullsFirst?
```ts
optional nullsFirst: boolean;
```

View File

@@ -64,6 +64,18 @@ client used by manifest-enabled native connections.
***
### oauthConfig?
```ts
optional oauthConfig: NativeOAuthConfig;
```
(For LanceDB cloud only): OAuth configuration for IdP-based
authentication (e.g., Azure Entra ID). When set, token acquisition
and refresh are handled entirely in Rust.
***
### readConsistencyInterval?
```ts

View File

@@ -0,0 +1,64 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / LsmWriteSpec
# Interface: LsmWriteSpec
Specification selecting Lance's MemWAL LSM-style write path for
`mergeInsert`.
`specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
`column` and `numBuckets` are required; for `"identity"`, `column` is
required.
## Properties
### column?
```ts
optional column: string;
```
Bucket and identity variants: the sharding column.
***
### maintainedIndexes?
```ts
optional maintainedIndexes: string[];
```
Names of indexes the MemWAL should keep up to date during writes.
***
### numBuckets?
```ts
optional numBuckets: number;
```
Bucket variant: the number of buckets, in `[1, 1024]`.
***
### specType
```ts
specType: "bucket" | "identity" | "unsharded";
```
One of `"bucket"`, `"identity"`, or `"unsharded"`.
***
### writerConfigDefaults?
```ts
optional writerConfigDefaults: Record<string, string>;
```
Default `ShardWriter` configuration recorded in the MemWAL index.

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.28.0-beta.11</version>
<version>0.29.1-beta.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.28.0-beta.11</version>
<version>0.29.1-beta.0</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>
@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>7.0.0-beta.7</lance-core.version>
<lance-core.version>7.0.0-beta.13</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.28.0-beta.11"
version = "0.29.1-beta.0"
publish = false
license.workspace = true
description.workspace = true

View File

@@ -109,3 +109,209 @@ describe("Query outputSchema", () => {
expect(schema.fields.length).toBe(3);
});
});
describe("Query orderBy", () => {
let tmpDir: tmp.DirResult;
let table: Table;
beforeEach(async () => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
const db = await connect(tmpDir.name);
// Create table with numeric data for sorting
const schema = new Schema([
new Field("id", new Int64(), true),
new Field("score", new Float32(), true),
new Field("name", new Utf8(), true),
]);
const data = makeArrowTable(
[
{ id: 1n, score: 3.5, name: "charlie" },
{ id: 2n, score: 1.2, name: "alice" },
{ id: 3n, score: 2.8, name: "bob" },
{ id: 4n, score: 0.5, name: "david" },
{ id: 5n, score: 4.1, name: "eve" },
],
{ schema },
);
table = await db.createTable("test", data);
});
afterEach(() => {
tmpDir.removeCallback();
});
it("should sort by single column ascending", async () => {
const results = await table
.query()
.orderBy({ columnName: "score", ascending: true, nullsFirst: false })
.toArray();
expect(results.length).toBe(5);
// Verify ascending order
expect(results[0].score).toBeCloseTo(0.5, 0.001);
expect(results[1].score).toBeCloseTo(1.2, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
expect(results[3].score).toBeCloseTo(3.5, 0.001);
expect(results[4].score).toBeCloseTo(4.1, 0.001);
});
it("should sort by single column descending", async () => {
const results = await table
.query()
.orderBy({ columnName: "score", ascending: false, nullsFirst: false })
.toArray();
expect(results.length).toBe(5);
// Verify descending order
expect(results[0].score).toBeCloseTo(4.1, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
expect(results[3].score).toBeCloseTo(1.2, 0.001);
expect(results[4].score).toBeCloseTo(0.5, 0.001);
});
it("should use ascending as default direction", async () => {
const results = await table
.query()
.orderBy({ columnName: "score" })
.toArray();
expect(results.length).toBe(5);
// Verify ascending order (default)
expect(results[0].score).toBeCloseTo(0.5, 0.001);
expect(results[1].score).toBeCloseTo(1.2, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
expect(results[3].score).toBeCloseTo(3.5, 0.001);
expect(results[4].score).toBeCloseTo(4.1, 0.001);
});
it("should sort by string column", async () => {
const results = await table
.query()
.orderBy({ columnName: "name" })
.toArray();
expect(results.length).toBe(5);
// Verify alphabetical order
expect(results[0].name).toBe("alice");
expect(results[1].name).toBe("bob");
expect(results[2].name).toBe("charlie");
expect(results[3].name).toBe("david");
expect(results[4].name).toBe("eve");
});
it("should support method chaining with where", async () => {
const results = await table
.query()
.where("score > 2.0")
.orderBy({ columnName: "score" })
.toArray();
expect(results.length).toBe(3);
// Verify filtered and sorted
expect(results[0].score).toBeCloseTo(2.8, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
expect(results[2].score).toBeCloseTo(4.1, 0.001);
});
it("should support method chaining with limit", async () => {
const results = await table
.query()
.orderBy({ columnName: "score", ascending: false })
.limit(3)
.toArray();
expect(results.length).toBe(3);
// Verify top 3 in descending order
expect(results[0].score).toBeCloseTo(4.1, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
});
it("should support method chaining with offset", async () => {
const results = await table
.query()
.orderBy({ columnName: "score" })
.offset(2)
.limit(2)
.toArray();
expect(results.length).toBe(2);
// Verify results skip first 2 and take next 2
expect(results[0].score).toBeCloseTo(2.8, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
});
it("should support method chaining with select", async () => {
const results = await table
.query()
.orderBy({ columnName: "name" })
.select(["name", "score"])
.toArray();
expect(results.length).toBe(5);
// Verify only selected columns are present
expect(Object.keys(results[0])).toEqual(["name", "score"]);
expect(Object.keys(results[4])).toEqual(["name", "score"]);
// Verify sorted by name
expect(results[0].name).toBe("alice");
expect(results[4].name).toBe("eve");
});
it("should support complex method chaining", async () => {
const results = await table
.query()
.where("score > 1.0")
.orderBy({ columnName: "score", ascending: false })
.limit(3)
.select(["id", "score", "name"])
.toArray();
expect(results.length).toBe(3);
// Verify filtered, sorted, limited, and projected
expect(results[0].score).toBeCloseTo(4.1, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
expect(Object.keys(results[0])).toEqual(["id", "score", "name"]);
});
it("should support multi-column ordering and null placement", async () => {
const schema = new Schema([
new Field("group", new Int64(), true),
new Field("score", new Float32(), true),
new Field("name", new Utf8(), true),
]);
const data = makeArrowTable(
[
{ group: 1n, score: null, name: "z" },
{ group: 1n, score: 1.0, name: "b" },
{ group: 1n, score: 1.0, name: "a" },
{ group: 2n, score: 0.5, name: "c" },
],
{ schema },
);
const nullTable = await (await connect(tmpDir.name)).createTable(
"test_multi_order",
data,
{ mode: "overwrite" },
);
const results = await nullTable
.query()
.orderBy([
{ columnName: "group", ascending: true, nullsFirst: false },
{ columnName: "score", ascending: true, nullsFirst: true },
{ columnName: "name", ascending: true, nullsFirst: false },
])
.toArray();
expect(results.map((r) => [r.group, r.score, r.name])).toEqual([
[1n, null, "z"],
[1n, 1.0, "a"],
[1n, 1.0, "b"],
[2n, 0.5, "c"],
]);
});
});

View File

@@ -2348,3 +2348,130 @@ describe("when creating a table with Float32Array vectors", () => {
expect((fsl.children[0].type as Float32).precision).toBe(1);
});
});
describe("setUnenforcedPrimaryKey", () => {
let tmpDir: tmp.DirResult;
beforeEach(() => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
});
afterEach(() => tmpDir.removeCallback());
it("sets a single-column primary key (string or one-element array)", async () => {
const conn = await connect(tmpDir.name);
const schema = new arrow.Schema([
new arrow.Field("id", new arrow.Int64(), false),
]);
const t1 = await conn.createEmptyTable("t1", schema);
await t1.setUnenforcedPrimaryKey("id");
const t2 = await conn.createEmptyTable("t2", schema);
await t2.setUnenforcedPrimaryKey(["id"]);
});
it("rejects a compound primary key", async () => {
const conn = await connect(tmpDir.name);
const table = await conn.createEmptyTable(
"t",
new arrow.Schema([
new arrow.Field("id", new arrow.Int64(), false),
new arrow.Field("name", new arrow.Utf8(), false),
]),
);
await expect(
table.setUnenforcedPrimaryKey(["id", "name"]),
).rejects.toThrow();
});
it("rejects changing the primary key once set", async () => {
const conn = await connect(tmpDir.name);
const table = await conn.createEmptyTable(
"t",
new arrow.Schema([
new arrow.Field("id", new arrow.Int64(), false),
new arrow.Field("name", new arrow.Utf8(), false),
]),
);
await table.setUnenforcedPrimaryKey("id");
await expect(table.setUnenforcedPrimaryKey("name")).rejects.toThrow();
await expect(table.setUnenforcedPrimaryKey("id")).rejects.toThrow();
});
});
describe("setLsmWriteSpec / unsetLsmWriteSpec", () => {
let tmpDir: tmp.DirResult;
beforeEach(() => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
});
afterEach(() => tmpDir.removeCallback());
async function makeTable(conn: Connection): Promise<Table> {
return await conn.createEmptyTable(
"t",
new arrow.Schema([new arrow.Field("id", new arrow.Int64(), false)]),
);
}
it("installs and removes a bucket spec", async () => {
const conn = await connect(tmpDir.name);
const table = await makeTable(conn);
await table.setUnenforcedPrimaryKey("id");
await table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 4,
});
await table.unsetLsmWriteSpec();
// A second unset errors — there is no spec left to remove.
await expect(table.unsetLsmWriteSpec()).rejects.toThrow();
// A fresh spec can be installed after unset.
await table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 8,
});
});
it("installs an unsharded spec", async () => {
const conn = await connect(tmpDir.name);
const table = await makeTable(conn);
await table.setUnenforcedPrimaryKey("id");
await table.setLsmWriteSpec({ specType: "unsharded" });
await table.unsetLsmWriteSpec();
});
it("installs an identity spec", async () => {
const conn = await connect(tmpDir.name);
const table = await makeTable(conn);
await table.setUnenforcedPrimaryKey("id");
await table.setLsmWriteSpec({ specType: "identity", column: "id" });
await table.unsetLsmWriteSpec();
});
it("rejects an invalid spec", async () => {
const conn = await connect(tmpDir.name);
const table = await makeTable(conn);
await table.setUnenforcedPrimaryKey("id");
// num_buckets out of range.
await expect(
table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 0,
}),
).rejects.toThrow();
// Column mismatch.
await expect(
table.setLsmWriteSpec({
specType: "bucket",
column: "missing",
numBuckets: 4,
}),
).rejects.toThrow();
});
});

View File

@@ -38,5 +38,14 @@ test("filtering examples", async () => {
// --8<-- [start:sql_search]
await tbl.query().where("id = 10").limit(10).toArray();
// --8<-- [end:sql_search]
// --8<-- [start:orderby_search]
await tbl
.query()
.where("id > 10")
.orderBy({ columnName: "id", ascending: false })
.limit(5)
.toArray();
// --8<-- [end:orderby_search]
});
});

View File

@@ -50,6 +50,7 @@ export {
SplitHashOptions,
SplitSequentialOptions,
ShuffleOptions,
OAuthConfig as NativeOAuthConfig,
} from "./native.js";
export {
@@ -82,6 +83,7 @@ export {
VectorQuery,
TakeQuery,
QueryExecutionOptions,
ColumnOrdering,
FullTextSearchOptions,
RecordBatchIterator,
FullTextQuery,
@@ -112,6 +114,7 @@ export {
UpdateOptions,
OptimizeOptions,
Version,
LsmWriteSpec,
ColumnAlteration,
} from "./table";
@@ -122,6 +125,8 @@ export {
TokenResponse,
} from "./header";
export { OAuthConfig, OAuthFlowType } from "./oauth";
export { MergeInsertBuilder, WriteExecutionOptions } from "./merge";
export * as embedding from "./embedding";

82
nodejs/lancedb/oauth.ts Normal file
View File

@@ -0,0 +1,82 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
/**
* OAuth authentication flow types.
*/
export enum OAuthFlowType {
/** Client Credentials grant (service-to-service / M2M). */
ClientCredentials = "client_credentials",
/** Authorization Code with PKCE (interactive browser-based auth). */
AuthorizationCodePKCE = "authorization_code_pkce",
/** Device Code grant (CLI / headless environments). */
DeviceCode = "device_code",
/** Azure Managed Identity via IMDS. */
AzureManagedIdentity = "azure_managed_identity",
/** Workload Identity Federation (K8s, GitHub Actions). */
WorkloadIdentity = "workload_identity",
}
/**
* OAuth configuration for LanceDB authentication.
*
* All token acquisition and refresh is handled in the Rust layer.
* This config is passed through to Rust via napi-rs.
*
* @example Client Credentials (service-to-service):
* ```typescript
* const config: OAuthConfig = {
* issuerUrl: "https://login.microsoftonline.com/{tenant}/v2.0",
* clientId: "app-id",
* clientSecret: "secret",
* scopes: ["api://lancedb-api/.default"],
* };
* ```
*
* @example Azure Managed Identity:
* ```typescript
* const config: OAuthConfig = {
* issuerUrl: "https://login.microsoftonline.com/{tenant}/v2.0",
* clientId: "app-id",
* scopes: ["api://lancedb-api/.default"],
* flow: OAuthFlowType.AzureManagedIdentity,
* };
* ```
*/
export interface OAuthConfig {
/**
* OIDC issuer URL or OAuth authority URL.
* For Azure: `https://login.microsoftonline.com/{tenant_id}/v2.0`
*/
issuerUrl: string;
/** Application / Client ID. */
clientId: string;
/**
* OAuth scopes to request.
* For Azure: `["api://{app_id}/.default"]`
*/
scopes: string[];
/** Authentication flow (default: ClientCredentials). */
flow?: OAuthFlowType;
/** Client secret (required for ClientCredentials). */
clientSecret?: string;
/** Redirect URI (AuthorizationCodePKCE flow). */
redirectUri?: string;
/** Port for local callback server (AuthorizationCodePKCE, default: 8400). */
callbackPort?: number;
/** Client ID for user-assigned managed identity (AzureManagedIdentity). */
managedIdentityClientId?: string;
/** Path to federated token file (WorkloadIdentity). */
tokenFile?: string;
/** Seconds before expiry to trigger proactive refresh (default: 300). */
refreshBufferSecs?: number;
}

View File

@@ -79,6 +79,12 @@ export interface QueryExecutionOptions {
timeoutMs?: number;
}
export interface ColumnOrdering {
columnName: string;
ascending?: boolean;
nullsFirst?: boolean;
}
/**
* Options that control the behavior of a full text search
*/
@@ -417,6 +423,21 @@ export class StandardQueryBase<
return this;
}
/**
* Sort the results by the specified column(s).
* @returns This query builder.
*/
orderBy(ordering: ColumnOrdering | ColumnOrdering[]): this {
const orderings = Array.isArray(ordering) ? ordering : [ordering];
const normalized = orderings.map((o) => ({
columnName: o.columnName,
ascending: o.ascending ?? true,
nullsFirst: o.nullsFirst ?? false,
}));
this.doCall((inner) => inner.orderBy(normalized));
return this;
}
/**
* Skip searching un-indexed data. This can make search faster, but will miss
* any data that is not yet indexed.

View File

@@ -106,6 +106,27 @@ export interface Version {
metadata: Record<string, string>;
}
/**
* Specification selecting Lance's MemWAL LSM-style write path for
* `mergeInsert`.
*
* `specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
* `column` and `numBuckets` are required; for `"identity"`, `column` is
* required.
*/
export interface LsmWriteSpec {
/** One of `"bucket"`, `"identity"`, or `"unsharded"`. */
specType: "bucket" | "identity" | "unsharded";
/** Bucket and identity variants: the sharding column. */
column?: string;
/** Bucket variant: the number of buckets, in `[1, 1024]`. */
numBuckets?: number;
/** Names of indexes the MemWAL should keep up to date during writes. */
maintainedIndexes?: string[];
/** Default `ShardWriter` configuration recorded in the MemWAL index. */
writerConfigDefaults?: Record<string, string>;
}
/**
* A Table is a collection of Records in a LanceDB Database.
*
@@ -449,6 +470,54 @@ export abstract class Table {
* containing the new version number of the table after dropping the columns.
*/
abstract dropColumns(columnNames: string[]): Promise<DropColumnsResult>;
/**
* Set the unenforced primary key for this table to a single column.
*
* "Unenforced" means LanceDB does not check uniqueness on writes; the
* column is recorded in the schema as the primary key for use by features
* such as `merge_insert`. Only single-column primary keys are supported,
* and the key cannot be changed once set.
* @param {string | string[]} columns The primary key column. A one-element
* array is also accepted; passing more than one column is rejected.
* @returns {Promise<void>}
*/
abstract setUnenforcedPrimaryKey(columns: string | string[]): Promise<void>;
/**
* Install an {@link LsmWriteSpec} on this table, selecting Lance's MemWAL
* LSM-style write path for future `mergeInsert` calls.
*
* `LsmWriteSpec` chooses one of three sharding strategies via `specType`:
*
* - `"bucket"` — hash-bucket writes by the single-column unenforced primary
* key (`column` and `numBuckets` required).
* - `"identity"` — shard by the raw value of a scalar `column`.
* - `"unsharded"` — route every write to a single shard.
*
* All variants require the table to have an unenforced primary key
* ({@link Table#setUnenforcedPrimaryKey}); bucket sharding additionally
* requires it to be the single column being bucketed.
* @param {LsmWriteSpec} spec The sharding spec to install.
* @returns {Promise<void>}
* @example
* ```ts
* await table.setUnenforcedPrimaryKey("id");
* await table.setLsmWriteSpec({
* specType: "bucket",
* column: "id",
* numBuckets: 16,
* maintainedIndexes: ["id_idx"],
* });
* ```
*/
abstract setLsmWriteSpec(spec: LsmWriteSpec): Promise<void>;
/**
* Remove the {@link LsmWriteSpec} from this table, reverting to the standard
* `mergeInsert` write path.
*
* Errors if no spec is currently set.
* @returns {Promise<void>}
*/
abstract unsetLsmWriteSpec(): Promise<void>;
/** Retrieve the version of the table */
abstract version(): Promise<number>;
@@ -897,6 +966,19 @@ export class LocalTable extends Table {
return await this.inner.dropColumns(columnNames);
}
async setUnenforcedPrimaryKey(columns: string | string[]): Promise<void> {
const cols = typeof columns === "string" ? [columns] : columns;
return await this.inner.setUnenforcedPrimaryKey(cols);
}
async setLsmWriteSpec(spec: LsmWriteSpec): Promise<void> {
return await this.inner.setLsmWriteSpec(spec);
}
async unsetLsmWriteSpec(): Promise<void> {
return await this.inner.unsetLsmWriteSpec();
}
async version(): Promise<number> {
return await this.inner.version();
}

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

11029
nodejs/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -112,6 +112,11 @@ impl Connection {
builder = builder.client_config(rust_config);
if let Some(oauth_config) = options.oauth_config {
let config: lancedb::remote::oauth::OAuthConfig = oauth_config.into();
builder = builder.oauth_config(config);
}
if let Some(api_key) = options.api_key {
builder = builder.api_key(&api_key);
}

View File

@@ -61,6 +61,10 @@ pub struct ConnectionOptions {
/// (For LanceDB cloud only): the host to use for LanceDB cloud. Used
/// for testing purposes.
pub host_override: Option<String>,
/// (For LanceDB cloud only): OAuth configuration for IdP-based
/// authentication (e.g., Azure Entra ID). When set, token acquisition
/// and refresh are handled entirely in Rust.
pub oauth_config: Option<remote::OAuthConfig>,
}
#[napi(object)]

View File

@@ -3,6 +3,12 @@
use std::sync::Arc;
use crate::error::NapiErrorExt;
use crate::error::convert_error;
use crate::iterator::RecordBatchIterator;
use crate::rerankers::RerankHybridCallbackArgs;
use crate::rerankers::Reranker;
use crate::util::{parse_distance_type, schema_to_buffer};
use arrow_array::{
Array, Float16Array as ArrowFloat16Array, Float32Array as ArrowFloat32Array,
Float64Array as ArrowFloat64Array, UInt8Array as ArrowUInt8Array,
@@ -19,16 +25,27 @@ use lancedb::query::QueryBase;
use lancedb::query::QueryExecutionOptions;
use lancedb::query::Select;
use lancedb::query::TakeQuery as LanceDbTakeQuery;
use lancedb::query::VectorQuery as LanceDbVectorQuery;
use lancedb::query::{ColumnOrdering as LanceDbColumnOrdering, VectorQuery as LanceDbVectorQuery};
use napi::bindgen_prelude::*;
use napi_derive::napi;
use crate::error::NapiErrorExt;
use crate::error::convert_error;
use crate::iterator::RecordBatchIterator;
use crate::rerankers::RerankHybridCallbackArgs;
use crate::rerankers::Reranker;
use crate::util::{parse_distance_type, schema_to_buffer};
#[napi(object)]
pub struct ColumnOrdering {
pub ascending: bool,
pub nulls_first: bool,
pub column_name: String,
}
impl From<ColumnOrdering> for LanceDbColumnOrdering {
fn from(value: ColumnOrdering) -> Self {
match (value.ascending, value.nulls_first) {
(true, true) => Self::asc_nulls_first(value.column_name),
(true, false) => Self::asc_nulls_last(value.column_name),
(false, true) => Self::desc_nulls_first(value.column_name),
(false, false) => Self::desc_nulls_last(value.column_name),
}
}
}
fn bytes_to_arrow_array(data: Uint8Array, dtype: String) -> napi::Result<Arc<dyn Array>> {
let buf = arrow_buffer::Buffer::from(data.to_vec());
@@ -128,6 +145,18 @@ impl Query {
self.inner = self.inner.clone().with_row_id();
}
#[napi]
pub fn order_by(&mut self, ordering: Option<Vec<ColumnOrdering>>) -> napi::Result<()> {
let ordering = ordering.map(|ordering| {
ordering
.into_iter()
.map(LanceDbColumnOrdering::from)
.collect()
});
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
#[napi(catch_unwind)]
pub async fn output_schema(&self) -> napi::Result<Buffer> {
let schema = self.inner.output_schema().await.default_error()?;
@@ -328,6 +357,18 @@ impl VectorQuery {
Ok(())
}
#[napi]
pub fn order_by(&mut self, ordering: Option<Vec<ColumnOrdering>>) -> napi::Result<()> {
let ordering = ordering.map(|ordering| {
ordering
.into_iter()
.map(LanceDbColumnOrdering::from)
.collect()
});
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
#[napi(catch_unwind)]
pub async fn output_schema(&self) -> napi::Result<Buffer> {
let schema = self.inner.output_schema().await.default_error()?;

View File

@@ -140,6 +140,67 @@ impl From<TlsConfig> for lancedb::remote::TlsConfig {
}
}
/// OAuth configuration for LanceDB authentication.
/// All token acquisition and refresh is handled in the Rust layer.
#[napi(object)]
#[derive(Debug, Clone)]
pub struct OAuthConfig {
/// OIDC issuer URL or OAuth authority URL.
/// For Azure: `https://login.microsoftonline.com/{tenant_id}/v2.0`
pub issuer_url: String,
/// Application / Client ID.
pub client_id: String,
/// OAuth scopes to request. For Azure: `["api://{app_id}/.default"]`
pub scopes: Vec<String>,
/// Authentication flow: "client_credentials", "authorization_code_pkce",
/// "device_code", "azure_managed_identity", "workload_identity"
pub flow: Option<String>,
/// Client secret (required for client_credentials).
pub client_secret: Option<String>,
/// Redirect URI (authorization_code_pkce flow).
pub redirect_uri: Option<String>,
/// Port for local callback server (authorization_code_pkce, default: 8400).
pub callback_port: Option<u16>,
/// Client ID for user-assigned managed identity (azure_managed_identity).
pub managed_identity_client_id: Option<String>,
/// Path to federated token file (workload_identity).
pub token_file: Option<String>,
/// Seconds before expiry to trigger proactive refresh (default: 300).
pub refresh_buffer_secs: Option<u32>,
}
impl From<OAuthConfig> for lancedb::remote::oauth::OAuthConfig {
fn from(config: OAuthConfig) -> Self {
use lancedb::remote::oauth::OAuthFlow;
let flow = match config.flow.as_deref().unwrap_or("client_credentials") {
"authorization_code_pkce" => OAuthFlow::AuthorizationCodePKCE {
redirect_uri: config.redirect_uri,
callback_port: config.callback_port,
},
"device_code" => OAuthFlow::DeviceCode,
"azure_managed_identity" => OAuthFlow::AzureManagedIdentity {
client_id: config.managed_identity_client_id,
},
"workload_identity" => OAuthFlow::WorkloadIdentity {
token_file: config
.token_file
.expect("tokenFile is required for workload_identity flow"),
},
other => panic!("Unknown OAuth flow type: {other}"),
};
Self {
issuer_url: config.issuer_url,
client_id: config.client_id,
client_secret: config.client_secret,
scopes: config.scopes,
flow,
refresh_buffer_secs: config.refresh_buffer_secs.map(|v| v as u64),
}
}
}
impl From<ClientConfig> for lancedb::remote::ClientConfig {
fn from(config: ClientConfig) -> Self {
Self {

View File

@@ -344,6 +344,31 @@ impl Table {
Ok(res.into())
}
#[napi(catch_unwind)]
pub async fn set_unenforced_primary_key(&self, columns: Vec<String>) -> napi::Result<()> {
self.inner_ref()?
.set_unenforced_primary_key(columns)
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> napi::Result<()> {
let native_spec = lancedb::table::LsmWriteSpec::try_from(spec)?;
self.inner_ref()?
.set_lsm_write_spec(native_spec)
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn unset_lsm_write_spec(&self) -> napi::Result<()> {
self.inner_ref()?
.unset_lsm_write_spec()
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn version(&self) -> napi::Result<i64> {
self.inner_ref()?
@@ -538,6 +563,63 @@ impl From<lancedb::index::IndexConfig> for IndexConfig {
}
}
/// Specification selecting Lance's MemWAL LSM-style write path for
/// `mergeInsert`.
///
/// `specType` must be `"bucket"`, `"identity"`, or `"unsharded"`. For
/// `"bucket"`, `column` and `numBuckets` are required; for `"identity"`,
/// `column` is required.
#[napi(object)]
#[derive(Clone, Debug)]
pub struct LsmWriteSpec {
/// One of `"bucket"`, `"identity"`, or `"unsharded"`.
pub spec_type: String,
/// Bucket and identity variants: the sharding column.
pub column: Option<String>,
/// Bucket variant: the number of buckets, in `[1, 1024]`.
pub num_buckets: Option<u32>,
/// Names of indexes the MemWAL should keep up to date during writes.
pub maintained_indexes: Option<Vec<String>>,
/// Default `ShardWriter` configuration recorded in the MemWAL index.
pub writer_config_defaults: Option<HashMap<String, String>>,
}
impl TryFrom<LsmWriteSpec> for lancedb::table::LsmWriteSpec {
type Error = napi::Error;
fn try_from(value: LsmWriteSpec) -> napi::Result<Self> {
let maintained = value.maintained_indexes.unwrap_or_default();
let writer_config_defaults = value.writer_config_defaults.unwrap_or_default();
let spec = match value.spec_type.as_str() {
"bucket" => {
let column = value.column.ok_or_else(|| {
napi::Error::from_reason("LsmWriteSpec bucket requires `column`")
})?;
let num_buckets = value.num_buckets.ok_or_else(|| {
napi::Error::from_reason("LsmWriteSpec bucket requires `numBuckets`")
})?;
Self::bucket(column, num_buckets)
}
"identity" => {
let column = value.column.ok_or_else(|| {
napi::Error::from_reason("LsmWriteSpec identity requires `column`")
})?;
Self::identity(column)
}
"unsharded" => Self::unsharded(),
other => {
return Err(napi::Error::from_reason(format!(
"LsmWriteSpec `specType` must be 'bucket', 'identity', or 'unsharded', got '{}'",
other
)));
}
};
Ok(spec
.with_maintained_indexes(maintained)
.with_writer_config_defaults(writer_config_defaults))
}
}
/// Statistics about a compaction operation.
#[napi(object)]
#[derive(Clone, Debug)]

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.31.0-beta.11"
current_version = "0.32.1-beta.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.31.0-beta.11"
version = "0.32.1-beta.0"
publish = false
edition.workspace = true
description = "Python bindings for LanceDB"

View File

@@ -320,6 +320,7 @@ async def connect_async(
session: Optional[Session] = None,
manifest_enabled: bool = False,
namespace_client_properties: Optional[Dict[str, str]] = None,
oauth_config=None,
) -> AsyncConnection:
"""Connect to a LanceDB database.
@@ -410,6 +411,7 @@ async def connect_async(
session,
manifest_enabled,
namespace_client_properties,
oauth_config,
)
)

View File

@@ -217,6 +217,9 @@ class Table:
async def uri(self) -> str: ...
async def initial_storage_options(self) -> Optional[Dict[str, str]]: ...
async def latest_storage_options(self) -> Optional[Dict[str, str]]: ...
async def set_unenforced_primary_key(self, columns: List[str]) -> None: ...
async def set_lsm_write_spec(self, spec: LsmWriteSpec) -> None: ...
async def unset_lsm_write_spec(self) -> None: ...
@property
def tags(self) -> Tags: ...
def query(self) -> Query: ...
@@ -247,6 +250,7 @@ async def connect(
session: Optional[Session],
manifest_enabled: bool = False,
namespace_client_properties: Optional[Dict[str, str]] = None,
oauth_config: Optional[Any] = None,
) -> Connection: ...
class RecordBatchStream:
@@ -255,6 +259,11 @@ class RecordBatchStream:
def __aiter__(self) -> "RecordBatchStream": ...
async def __anext__(self) -> pa.RecordBatch: ...
class ColumnOrdering(TypedDict):
column_name: str
ascending: bool
nulls_first: bool
class Query:
def where(self, filter: str): ...
def where_expr(self, expr: PyExpr): ...
@@ -268,6 +277,7 @@ class Query:
def postfilter(self): ...
def nearest_to(self, query_vec: pa.Array) -> VectorQuery: ...
def nearest_to_text(self, query: dict) -> FTSQuery: ...
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
async def output_schema(self) -> pa.Schema: ...
async def execute(
self, max_batch_length: Optional[int], timeout: Optional[timedelta]
@@ -296,6 +306,7 @@ class FTSQuery:
def get_query(self) -> str: ...
def add_query_vector(self, query_vec: pa.Array) -> None: ...
def nearest_to(self, query_vec: pa.Array) -> HybridQuery: ...
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
async def output_schema(self) -> pa.Schema: ...
async def execute(
self, max_batch_length: Optional[int], timeout: Optional[timedelta]
@@ -321,6 +332,7 @@ class VectorQuery:
def maximum_nprobes(self, maximum_nprobes: int): ...
def bypass_vector_index(self): ...
def nearest_to_text(self, query: dict) -> HybridQuery: ...
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
def to_query_request(self) -> PyQueryRequest: ...
class HybridQuery:
@@ -339,6 +351,7 @@ class HybridQuery:
def minimum_nprobes(self, minimum_nprobes: int): ...
def maximum_nprobes(self, maximum_nprobes: int): ...
def bypass_vector_index(self): ...
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
def to_vector_query(self) -> VectorQuery: ...
def to_fts_query(self) -> FTSQuery: ...
def get_limit(self) -> int: ...
@@ -368,6 +381,7 @@ class PyQueryRequest:
bypass_vector_index: Optional[bool]
postfilter: Optional[bool]
norm: Optional[str]
order_by: Optional[List[ColumnOrdering]]
class CompactionStats:
fragments_removed: int
@@ -408,6 +422,37 @@ class MergeResult:
num_deleted_rows: int
num_attempts: int
class LsmWriteSpec:
"""Specification selecting Lance's MemWAL LSM-style write path for
`merge_insert`."""
@staticmethod
def bucket(column: str, num_buckets: int) -> "LsmWriteSpec": ...
@staticmethod
def identity(column: str) -> "LsmWriteSpec": ...
@staticmethod
def unsharded() -> "LsmWriteSpec": ...
def with_maintained_indexes(self, indexes: List[str]) -> "LsmWriteSpec":
"""Return a copy of this spec asking the MemWAL to keep the named
indexes up to date as rows are appended."""
...
def with_writer_config_defaults(self, defaults: Dict[str, str]) -> "LsmWriteSpec":
"""Return a copy of this spec recording the given default
`ShardWriter` configuration in the MemWAL index."""
...
@property
def spec_type(self) -> str:
"""One of 'bucket', 'identity', or 'unsharded'."""
...
@property
def column(self) -> Optional[str]: ...
@property
def num_buckets(self) -> Optional[int]: ...
@property
def maintained_indexes(self) -> List[str]: ...
@property
def writer_config_defaults(self) -> Dict[str, str]: ...
class AddColumnsResult:
version: int

View File

@@ -968,22 +968,32 @@ class Permutation:
new.transform_fn = transform
return new
def take_offsets(self, offsets: list[int]) -> Any:
"""
Take rows from the permutation by offset
The returned value is passed through the permutation's current transform,
so `with_format` and `with_transform` affect this method in the same way
they affect iteration.
"""
async def do_take_offsets():
return await self.reader.take_offsets(offsets, selection=self.selection)
batch = LOOP.run(do_take_offsets())
return self.transform_fn(batch)
def __getitem__(self, index: int) -> Any:
"""
Returns a single row from the permutation by offset
"""
return self.__getitems__([index])
return self.take_offsets([index])
def __getitems__(self, indices: list[int]) -> Any:
"""
Returns rows from the permutation by offset
"""
async def do_getitems():
return await self.reader.take_offsets(indices, selection=self.selection)
batch = LOOP.run(do_getitems())
return self.transform_fn(batch)
return self.take_offsets(indices)
@deprecated(details="Use with_skip instead")
def skip(self, skip: int) -> "Permutation":

View File

@@ -92,6 +92,12 @@ def ensure_vector_query(
return val
class ColumnOrdering(pydantic.BaseModel):
column_name: str
ascending: bool = True
nulls_first: bool = False
class FullTextQueryType(str, Enum):
MATCH = "match"
MATCH_PHRASE = "match_phrase"
@@ -504,6 +510,8 @@ class Query(pydantic.BaseModel):
# Bypass the vector index and use a brute force search
bypass_vector_index: Optional[bool] = None
order_by: Optional[List[ColumnOrdering]] = None
@classmethod
def from_inner(cls, req: PyQueryRequest) -> Self:
query = cls()
@@ -524,6 +532,8 @@ class Query(pydantic.BaseModel):
query.refine_factor = req.refine_factor
query.bypass_vector_index = req.bypass_vector_index
query.postfilter = req.postfilter
if req.order_by is not None:
query.order_by = [ColumnOrdering(**o) for o in req.order_by]
if req.full_text_search is not None:
query.full_text_query = FullTextSearchQuery(
columns=None,
@@ -572,9 +582,22 @@ class LanceQueryBuilder(ABC):
If "auto", the query type is inferred based on the query.
vector_column_name: str
The name of the vector column to use for vector search.
ordering_field_name: Optional[str]
.. deprecated:: 0.27.0
Use ``order_by()`` method instead.
fts_columns: Optional[Union[str, List[str]]]
The columns to search in for full text search.
fast_search: bool
Skip flat search of unindexed data.
"""
if ordering_field_name is not None:
import warnings
warnings.warn(
"ordering_field_name is deprecated, use .order_by() method instead.",
DeprecationWarning,
stacklevel=2,
)
# Check hybrid search first as it supports empty query pattern
if query_type == "hybrid":
# hybrid fts and vector query
@@ -671,6 +694,7 @@ class LanceQueryBuilder(ABC):
self._text = None
self._ef = None
self._bypass_vector_index = None
self._order_by = None
@deprecation.deprecated(
deprecated_in="0.3.1",
@@ -947,6 +971,24 @@ class LanceQueryBuilder(ABC):
""" # noqa: E501
return self._table._explain_plan(self.to_query_object(), verbose=verbose)
def order_by(self, ordering: Optional[List[ColumnOrdering]]) -> Self:
"""
Set the ordering for the results.
Parameters
----------
ordering: Optional[List[ColumnOrdering]]
The ordering to use for the results. If None, then the default ordering
will be used.
Returns
-------
LanceQueryBuilder
The LanceQueryBuilder object.
"""
self._order_by = ordering
return self
def analyze_plan(self) -> str:
"""
Run the query and return its execution plan with runtime metrics.
@@ -1314,6 +1356,7 @@ class LanceVectorQueryBuilder(LanceQueryBuilder):
fast_search=self._fast_search,
ef=self._ef,
bypass_vector_index=self._bypass_vector_index,
order_by=self._order_by,
)
def to_batches(
@@ -1465,7 +1508,9 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
super().__init__(table)
self._query = query
self._phrase_query = False
self.ordering_field_name = ordering_field_name
# Deprecated compatibility parameter. Native FTS ordering is now
# configured through order_by(); LanceQueryBuilder.create emits the warning.
_ = ordering_field_name
self._reranker = None
self._fast_search = fast_search
if isinstance(fts_columns, str):
@@ -1514,6 +1559,7 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
),
offset=self._offset,
fast_search=self._fast_search,
order_by=self._order_by,
)
def output_schema(self) -> pa.Schema:
@@ -1579,6 +1625,7 @@ class LanceEmptyQueryBuilder(LanceQueryBuilder):
limit=self._limit,
with_row_id=self._with_row_id,
offset=self._offset,
order_by=self._order_by,
)
def output_schema(self) -> pa.Schema:
@@ -2502,6 +2549,27 @@ class AsyncStandardQuery(AsyncQueryBase):
self._inner.offset(offset)
return self
def order_by(self, ordering: Optional[List[ColumnOrdering]]) -> Self:
"""
Set the ordering for the results.
Parameters
----------
ordering: Optional[List[ColumnOrdering]]
The ordering to use for the results. If None, then the default ordering
will be used.
"""
if ordering is None:
self._inner.order_by(None)
else:
self._inner.order_by(
[
o.model_dump() if hasattr(o, "model_dump") else o.dict()
for o in ordering
]
)
return self
def fast_search(self) -> Self:
"""
Skip searching un-indexed data.

View File

@@ -9,6 +9,7 @@ from typing import List, Optional
from lancedb import __version__
from .header import HeaderProvider
from .oauth import OAuthConfig, OAuthFlowType
__all__ = [
"TimeoutConfig",
@@ -16,6 +17,8 @@ __all__ = [
"TlsConfig",
"ClientConfig",
"HeaderProvider",
"OAuthConfig",
"OAuthFlowType",
]

View File

@@ -0,0 +1,90 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional
class OAuthFlowType(str, Enum):
"""OAuth authentication flow types."""
CLIENT_CREDENTIALS = "client_credentials"
"""Client Credentials grant (service-to-service / M2M)."""
AUTHORIZATION_CODE_PKCE = "authorization_code_pkce"
"""Authorization Code with PKCE (interactive browser-based auth)."""
DEVICE_CODE = "device_code"
"""Device Code grant (CLI / headless environments)."""
AZURE_MANAGED_IDENTITY = "azure_managed_identity"
"""Azure Managed Identity via IMDS."""
WORKLOAD_IDENTITY = "workload_identity"
"""Workload Identity Federation (K8s, GitHub Actions)."""
@dataclass
class OAuthConfig:
"""OAuth configuration for LanceDB authentication.
All token acquisition and refresh is handled in the Rust layer.
This config is passed through to Rust via PyO3.
Parameters
----------
issuer_url : str
OIDC issuer URL or OAuth authority URL.
For Azure: ``https://login.microsoftonline.com/{tenant_id}/v2.0``
client_id : str
Application / Client ID.
scopes : List[str]
OAuth scopes to request.
For Azure: ``["api://{app_id}/.default"]``
flow : OAuthFlowType
Authentication flow to use. Default: CLIENT_CREDENTIALS.
client_secret : Optional[str]
Client secret (required for CLIENT_CREDENTIALS).
redirect_uri : Optional[str]
Redirect URI for AUTHORIZATION_CODE_PKCE flow.
callback_port : Optional[int]
Port for local HTTP callback server (AUTHORIZATION_CODE_PKCE, default: 8400).
managed_identity_client_id : Optional[str]
Client ID for user-assigned managed identity (AZURE_MANAGED_IDENTITY).
token_file : Optional[str]
Path to federated token file (WORKLOAD_IDENTITY).
refresh_buffer_secs : Optional[int]
Seconds before expiry to trigger proactive refresh (default: 300).
Examples
--------
Client Credentials (service-to-service):
>>> config = OAuthConfig(
... issuer_url="https://login.microsoftonline.com/{tenant}/v2.0",
... client_id="app-id",
... client_secret="secret",
... scopes=["api://lancedb-api/.default"],
... )
Azure Managed Identity:
>>> config = OAuthConfig(
... issuer_url="https://login.microsoftonline.com/{tenant}/v2.0",
... client_id="app-id",
... scopes=["api://lancedb-api/.default"],
... flow=OAuthFlowType.AZURE_MANAGED_IDENTITY,
... )
"""
issuer_url: str
client_id: str
scopes: List[str]
flow: OAuthFlowType = OAuthFlowType.CLIENT_CREDENTIALS
client_secret: Optional[str] = None
redirect_uri: Optional[str] = None
callback_port: Optional[int] = None
managed_identity_client_id: Optional[str] = None
token_file: Optional[str] = None
refresh_buffer_secs: Optional[int] = None

View File

@@ -14,6 +14,7 @@ from lancedb._lancedb import (
DeleteResult,
DropColumnsResult,
IndexConfig,
LsmWriteSpec,
MergeResult,
UpdateResult,
)
@@ -655,6 +656,18 @@ class RemoteTable(Table):
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
return LOOP.run(self._table.drop_columns(columns))
def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None:
"""Not supported on LanceDB Cloud."""
return LOOP.run(self._table.set_unenforced_primary_key(columns))
def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
"""Not supported on LanceDB Cloud."""
return LOOP.run(self._table.set_lsm_write_spec(spec))
def unset_lsm_write_spec(self) -> None:
"""Not supported on LanceDB Cloud."""
return LOOP.run(self._table.unset_lsm_write_spec())
def drop_index(self, index_name: str):
return LOOP.run(self._table.drop_index(index_name))

View File

@@ -154,6 +154,7 @@ if TYPE_CHECKING:
AlterColumnsResult,
DeleteResult,
DropColumnsResult,
LsmWriteSpec,
MergeResult,
UpdateResult,
)
@@ -3263,6 +3264,21 @@ class LanceTable(Table):
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
return LOOP.run(self._table.drop_columns(columns))
def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None:
"""Set the unenforced primary key. See
[`AsyncTable.set_unenforced_primary_key`][lancedb.AsyncTable.set_unenforced_primary_key]."""
return LOOP.run(self._table.set_unenforced_primary_key(columns))
def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
"""Install an LsmWriteSpec. See
[`AsyncTable.set_lsm_write_spec`][lancedb.AsyncTable.set_lsm_write_spec]."""
return LOOP.run(self._table.set_lsm_write_spec(spec))
def unset_lsm_write_spec(self) -> None:
"""Remove the LsmWriteSpec. See
[`AsyncTable.unset_lsm_write_spec`][lancedb.AsyncTable.unset_lsm_write_spec]."""
return LOOP.run(self._table.unset_lsm_write_spec())
def uses_v2_manifest_paths(self) -> bool:
"""
Check if the table is using the new v2 manifest paths.
@@ -3808,6 +3824,69 @@ class AsyncTable:
Any attempt to use the table after it has been closed will raise an error."""
return self._inner.close()
async def set_unenforced_primary_key(
self, columns: Union[str, Iterable[str]]
) -> None:
"""Set the unenforced primary key for this table to the given
ordered list of columns.
"Unenforced" means LanceDB does not check uniqueness on writes; the
columns are recorded in the schema as the primary key so that
features such as `merge_insert` can use them. Calling this again
replaces any previously-set primary key.
Parameters
----------
columns : str or Iterable[str]
Either a single column name (single-column key) or an ordered
iterable of column names (composite key). Each column dtype
must be one of: int32, int64, utf8, large_utf8, binary,
large_binary, fixed_size_binary.
"""
if isinstance(columns, str):
columns = [columns]
else:
columns = list(columns)
await self._inner.set_unenforced_primary_key(columns)
async def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
"""Install an LsmWriteSpec on this table.
The spec selects Lance's MemWAL LSM-style write path for future
`merge_insert` calls. ``LsmWriteSpec`` chooses one of three sharding
strategies:
- ``LsmWriteSpec.bucket(column, num_buckets)`` — hash-bucket writes by
the single-column unenforced primary key.
- ``LsmWriteSpec.identity(column)`` — shard by the raw value of a
scalar column.
- ``LsmWriteSpec.unsharded()`` — route every write to a single shard.
All variants require the table to have an unenforced primary key set
via [`set_unenforced_primary_key`]; bucket sharding additionally
requires it to be the single column being bucketed.
Parameters
----------
spec : LsmWriteSpec
The sharding spec to install.
Examples
--------
>>> from lancedb._lancedb import LsmWriteSpec
>>> # table.set_unenforced_primary_key("id")
>>> # table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 16))
"""
await self._inner.set_lsm_write_spec(spec)
async def unset_lsm_write_spec(self) -> None:
"""Remove the LsmWriteSpec from this table.
Reverts to the standard `merge_insert` write path. Errors if no spec
is currently set.
"""
await self._inner.unset_lsm_write_spec()
@property
def name(self) -> str:
"""The name of the table."""
@@ -4512,6 +4591,8 @@ class AsyncTable:
async_query = async_query.fast_search()
if query.with_row_id:
async_query = async_query.with_row_id()
if query.order_by:
async_query = async_query.order_by(query.order_by)
if query.vector:
async_query = async_query.nearest_to(query.vector).distance_range(

View File

@@ -1,4 +1,3 @@
segmenter:
mode: "normal"
dictionary:
path: "./python/tests/models/lindera/ipadic/main"
dictionary: "./python/tests/models/lindera/ipadic/main"

View File

@@ -29,6 +29,7 @@ from lancedb.query import (
MultiMatchQuery,
PhraseQuery,
BooleanQuery,
ColumnOrdering,
Occur,
LanceFtsQueryBuilder,
)
@@ -116,8 +117,7 @@ def lindera_ipadic(language_model_home):
config_path.write_text(
"segmenter:\n"
' mode: "normal"\n'
" dictionary:\n"
f' path: "{extracted_model.resolve().as_posix()}"\n',
f' dictionary: "{extracted_model.resolve().as_posix()}"\n',
encoding="utf-8",
)
@@ -500,6 +500,36 @@ async def test_search_fts_specify_column_async(async_table):
pass
def test_search_order_by_descending(table):
table.create_fts_index("text")
rows = (
table.search("puppy")
.order_by([ColumnOrdering(column_name="count", ascending=False)])
.limit(20)
.select(["text", "count"])
.to_list()
)
for r in rows:
assert "puppy" in r["text"]
assert sorted(rows, key=lambda x: x["count"], reverse=True) == rows
def test_search_order_by_ascending(table):
table.create_fts_index("text")
rows = (
table.search("puppy")
.order_by([ColumnOrdering(column_name="count", ascending=True)])
.limit(20)
.select(["text", "count"])
.to_list()
)
for r in rows:
assert "puppy" in r["text"]
assert sorted(rows, key=lambda x: x["count"]) == rows
def test_create_index_from_table(tmp_path, table):
table.create_fts_index("text")
df = table.search("puppy").limit(5).select(["text"]).to_pandas()

View File

@@ -0,0 +1,149 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""Tests for installing and clearing an LsmWriteSpec via
`Table.set_lsm_write_spec` / `Table.unset_lsm_write_spec`.
"""
from datetime import timedelta
import lancedb
import pyarrow as pa
import pytest
from lancedb._lancedb import LsmWriteSpec
SCHEMA = pa.schema(
[
pa.field("id", pa.utf8(), nullable=False),
pa.field("v", pa.int32(), nullable=False),
]
)
def _batch(ids, vs):
return pa.RecordBatch.from_arrays(
[pa.array(ids, type=pa.utf8()), pa.array(vs, type=pa.int32())],
schema=SCHEMA,
)
def _reader(ids, vs):
return pa.RecordBatchReader.from_batches(SCHEMA, [_batch(ids, vs)])
def _make_table(tmp_path):
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _reader(["seed"], [0]))
return db, table
def test_set_lsm_write_spec_validates(tmp_path):
_db, table = _make_table(tmp_path)
# No PK set yet.
with pytest.raises(Exception, match="primary key"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
table.set_unenforced_primary_key("id")
# Column mismatch.
with pytest.raises(Exception, match="match"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("v", 4))
# Out-of-range num_buckets.
with pytest.raises(Exception, match="num_buckets"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0))
with pytest.raises(Exception, match="num_buckets"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1025))
# Happy path then mutation rejected.
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
with pytest.raises(Exception, match="mutation"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
def test_unset_lsm_write_spec(tmp_path):
_db, table = _make_table(tmp_path)
# unset errors when no spec is set.
with pytest.raises(Exception, match="no LSM write spec"):
table.unset_lsm_write_spec()
# Install a spec, then remove it; afterwards a fresh spec can be set.
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
table.unset_lsm_write_spec()
# A second unset errors — there is no spec left to remove.
with pytest.raises(Exception, match="no LSM write spec"):
table.unset_lsm_write_spec()
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
def test_set_unsharded_spec(tmp_path):
_db, table = _make_table(tmp_path)
# Lance MemWAL still requires a primary key on the dataset; Unsharded
# just skips per-row hashing.
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.unsharded())
table.unset_lsm_write_spec()
def test_lsm_write_spec_repr():
s = LsmWriteSpec.bucket("id", 4)
assert s.spec_type == "bucket"
assert s.column == "id"
assert s.num_buckets == 4
assert s.maintained_indexes == []
assert "bucket" in repr(s)
assert "id" in repr(s)
assert "4" in repr(s)
u = LsmWriteSpec.unsharded()
assert u.spec_type == "unsharded"
assert u.column is None
assert u.num_buckets is None
assert "unsharded" in repr(u)
def test_lsm_write_spec_with_maintained_indexes():
s = LsmWriteSpec.bucket("id", 4).with_maintained_indexes(["idx_a", "idx_b"])
assert s.maintained_indexes == ["idx_a", "idx_b"]
@pytest.mark.asyncio
async def test_async_set_unset_lsm_write_spec(tmp_path):
db = await lancedb.connect_async(
tmp_path, read_consistency_interval=timedelta(seconds=0)
)
table = await db.create_table(
"t",
pa.RecordBatchReader.from_batches(SCHEMA, [_batch(["seed"], [0])]),
)
await table.set_unenforced_primary_key("id")
await table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
await table.unset_lsm_write_spec()
# A second unset errors.
with pytest.raises(Exception, match="no LSM write spec"):
await table.unset_lsm_write_spec()
def test_set_identity_spec(tmp_path):
_db, table = _make_table(tmp_path)
# Identity sharding still requires an unenforced primary key on the
# table; it shards by the raw value of the given column.
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.identity("v"))
table.unset_lsm_write_spec()
def test_lsm_write_spec_identity_and_writer_config_defaults():
s = LsmWriteSpec.identity("v")
assert s.spec_type == "identity"
assert s.column == "v"
assert s.num_buckets is None
assert "identity" in repr(s)
s = s.with_writer_config_defaults({"durable_write": "false"})
assert s.writer_config_defaults == {"durable_write": "false"}
assert "durable_write" in repr(s)

View File

@@ -1080,3 +1080,29 @@ def test_getitems_invalid_offset(some_permutation: Permutation):
"""Test __getitems__ with an out-of-range offset raises an error."""
with pytest.raises(Exception):
some_permutation.__getitems__([999999])
def test_take_offsets(some_permutation: Permutation):
result = some_permutation.take_offsets([0, 1, 2])
assert isinstance(result, list)
assert "id" in result[0]
assert "value" in result[0]
assert len(result) == 3
def test_take_offsets_empty_identity_permutation(mem_db):
tbl = mem_db.create_table(
"test_table", pa.table({"id": range(10), "value": range(10)})
)
permutation = Permutation.identity(tbl)
result = permutation.take_offsets([])
assert result == []
def test_take_offsets_empty_permutation(some_permutation: Permutation):
result = some_permutation.take_offsets([])
assert result == []

View File

@@ -0,0 +1,79 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""Tests for Table.set_unenforced_primary_key."""
from datetime import timedelta
import lancedb
import pyarrow as pa
import pytest
def _empty_table(path, schema):
db = lancedb.connect(path, read_consistency_interval=timedelta(seconds=0))
return db.create_table("t", schema=schema)
def test_set_unenforced_primary_key_accepts_string_or_one_element_list(tmp_path):
schema = pa.schema([pa.field("id", pa.int64(), nullable=False)])
# Bare string.
table = _empty_table(tmp_path / "s", schema)
table.set_unenforced_primary_key("id")
# One-element list.
table = _empty_table(tmp_path / "l", schema)
table.set_unenforced_primary_key(["id"])
def test_set_unenforced_primary_key_rejects_compound_and_empty(tmp_path):
table = _empty_table(
tmp_path,
pa.schema(
[
pa.field("a", pa.utf8(), nullable=False),
pa.field("b", pa.int64(), nullable=False),
]
),
)
# Compound keys are not supported.
with pytest.raises(Exception, match="compound"):
table.set_unenforced_primary_key(["a", "b"])
# Empty input.
with pytest.raises(Exception, match="required"):
table.set_unenforced_primary_key([])
def test_set_unenforced_primary_key_is_immutable(tmp_path):
table = _empty_table(
tmp_path,
pa.schema(
[
pa.field("a", pa.utf8(), nullable=False),
pa.field("b", pa.int64(), nullable=False),
]
),
)
table.set_unenforced_primary_key("a")
# The primary key cannot be changed or re-set once installed.
with pytest.raises(Exception, match="already set"):
table.set_unenforced_primary_key("b")
with pytest.raises(Exception, match="already set"):
table.set_unenforced_primary_key("a")
def test_set_unenforced_primary_key_validates(tmp_path):
table = _empty_table(
tmp_path / "t", pa.schema([pa.field("id", pa.utf8(), nullable=False)])
)
# Unknown column.
with pytest.raises(Exception, match="not found"):
table.set_unenforced_primary_key("nonexistent")
# Unsupported dtype (Float32 not in the supported set).
bad = _empty_table(
tmp_path / "bad", pa.schema([pa.field("id", pa.float32(), nullable=False)])
)
with pytest.raises(Exception, match="not supported"):
bad.set_unenforced_primary_key("id")

View File

@@ -25,6 +25,7 @@ from lancedb.query import (
AsyncHybridQuery,
AsyncQueryBase,
AsyncVectorQuery,
ColumnOrdering,
LanceVectorQueryBuilder,
MatchQuery,
PhraseQuery,
@@ -164,6 +165,71 @@ def test_offset(table):
assert len(results_with_offset.to_pandas()) == 1
def test_order_by_plain_query(mem_db):
table = mem_db.create_table(
"test_order_by",
pa.table(
{
"group": [1, 1, 1, 2],
"score": [None, 1.0, 1.0, 0.5],
"name": ["z", "b", "a", "c"],
}
),
)
res = (
table.search()
.order_by(
[
ColumnOrdering(column_name="group", ascending=True, nulls_first=False),
ColumnOrdering(column_name="score", ascending=True, nulls_first=True),
ColumnOrdering(column_name="name", ascending=True, nulls_first=False),
]
)
.to_arrow()
)
assert res.select(["group", "score", "name"]).to_pylist() == [
{"group": 1, "score": None, "name": "z"},
{"group": 1, "score": 1.0, "name": "a"},
{"group": 1, "score": 1.0, "name": "b"},
{"group": 2, "score": 0.5, "name": "c"},
]
@pytest.mark.asyncio
async def test_order_by_async_query(mem_db_async: AsyncConnection):
table = await mem_db_async.create_table(
"test_order_by_async",
pa.table(
{
"group": [1, 1, 1, 2],
"score": [None, 1.0, 1.0, 0.5],
"name": ["z", "b", "a", "c"],
}
),
)
res = await (
table.query()
.order_by(
[
ColumnOrdering(column_name="group", ascending=True, nulls_first=False),
ColumnOrdering(column_name="score", ascending=True, nulls_first=True),
ColumnOrdering(column_name="name", ascending=True, nulls_first=False),
]
)
.to_arrow()
)
assert res.select(["group", "score", "name"]).to_pylist() == [
{"group": 1, "score": None, "name": "z"},
{"group": 1, "score": 1.0, "name": "a"},
{"group": 1, "score": 1.0, "name": "b"},
{"group": 2, "score": 0.5, "name": "c"},
]
def test_query_builder(table):
rs = (
LanceVectorQueryBuilder(table, [0, 0], "vector")

View File

@@ -16,6 +16,7 @@ from packaging.version import Version
import lancedb
from lancedb.conftest import MockTextEmbeddingFunction
from lancedb.query import ColumnOrdering
from lancedb.remote import ClientConfig
from lancedb.remote.errors import HttpError, RetryError
import pytest
@@ -660,6 +661,18 @@ def test_query_sync_maximal():
"ef": None,
"filter": "id > 0",
"columns": ["id", "name"],
"order_by": [
{
"column_name": "score",
"ascending": False,
"nulls_first": True,
},
{
"column_name": "id",
"ascending": True,
"nulls_first": False,
},
],
"vector_column": "vector2",
"fast_search": True,
"with_row_id": True,
@@ -677,6 +690,14 @@ def test_query_sync_maximal():
.refine_factor(10)
.nprobes(5)
.where("id > 0", prefilter=True)
.order_by(
[
ColumnOrdering(
column_name="score", ascending=False, nulls_first=True
),
ColumnOrdering(column_name="id", ascending=True, nulls_first=False),
]
)
.with_row_id(True)
.select(["id", "name"])
.to_list()

View File

@@ -539,7 +539,7 @@ impl Connection {
}
#[pyfunction]
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None))]
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None, oauth_config=None))]
#[allow(clippy::too_many_arguments)]
pub fn connect(
py: Python<'_>,
@@ -553,6 +553,7 @@ pub fn connect(
session: Option<crate::session::Session>,
manifest_enabled: bool,
namespace_client_properties: Option<HashMap<String, String>>,
oauth_config: Option<crate::oauth::PyOAuthConfig>,
) -> PyResult<Bound<'_, PyAny>> {
future_into_py(py, async move {
let mut builder = lancedb::connect(&uri);
@@ -582,6 +583,10 @@ pub fn connect(
if let Some(client_config) = client_config {
builder = builder.client_config(client_config.into());
}
if let Some(oauth_config) = oauth_config {
let config: lancedb::remote::oauth::OAuthConfig = oauth_config.into();
builder = builder.oauth_config(config);
}
if let Some(session) = session {
builder = builder.session(session.inner.clone());
}

View File

@@ -15,8 +15,8 @@ use pyo3::{
use query::{FTSQuery, HybridQuery, Query, VectorQuery};
use session::Session;
use table::{
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, MergeResult,
Table, UpdateResult,
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec,
MergeResult, Table, UpdateResult,
};
pub mod arrow;
@@ -26,6 +26,7 @@ pub mod expr;
pub mod header;
pub mod index;
pub mod namespace;
pub mod oauth;
pub mod permutation;
pub mod query;
pub mod runtime;
@@ -52,6 +53,7 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<AlterColumnsResult>()?;
m.add_class::<AddResult>()?;
m.add_class::<MergeResult>()?;
m.add_class::<LsmWriteSpec>()?;
m.add_class::<DeleteResult>()?;
m.add_class::<DropColumnsResult>()?;
m.add_class::<UpdateResult>()?;

53
python/src/oauth.rs Normal file
View File

@@ -0,0 +1,53 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use pyo3::FromPyObject;
use lancedb::remote::oauth::{OAuthConfig, OAuthFlow};
/// Python-side OAuth configuration, extracted via FromPyObject.
/// Maps to `lancedb.remote.oauth.OAuthConfig` Python dataclass.
#[derive(FromPyObject)]
pub struct PyOAuthConfig {
pub issuer_url: String,
pub client_id: String,
pub scopes: Vec<String>,
pub flow: String,
pub client_secret: Option<String>,
pub redirect_uri: Option<String>,
pub callback_port: Option<u16>,
pub managed_identity_client_id: Option<String>,
pub token_file: Option<String>,
pub refresh_buffer_secs: Option<u64>,
}
impl From<PyOAuthConfig> for OAuthConfig {
fn from(py: PyOAuthConfig) -> Self {
let flow = match py.flow.as_str() {
"client_credentials" => OAuthFlow::ClientCredentials,
"authorization_code_pkce" => OAuthFlow::AuthorizationCodePKCE {
redirect_uri: py.redirect_uri,
callback_port: py.callback_port,
},
"device_code" => OAuthFlow::DeviceCode,
"azure_managed_identity" => OAuthFlow::AzureManagedIdentity {
client_id: py.managed_identity_client_id,
},
"workload_identity" => OAuthFlow::WorkloadIdentity {
token_file: py
.token_file
.expect("token_file is required for workload_identity flow"),
},
other => panic!("Unknown OAuth flow type: {other}"),
};
OAuthConfig {
issuer_url: py.issuer_url,
client_id: py.client_id,
client_secret: py.client_secret,
scopes: py.scopes,
flow,
refresh_buffer_secs: py.refresh_buffer_secs,
}
}
}

View File

@@ -23,7 +23,7 @@ use lancedb::query::QueryBase;
use lancedb::query::QueryExecutionOptions;
use lancedb::query::QueryFilter;
use lancedb::query::{
ExecutableQuery, Query as LanceDbQuery, Select, TakeQuery as LanceDbTakeQuery,
ColumnOrdering, ExecutableQuery, Query as LanceDbQuery, Select, TakeQuery as LanceDbTakeQuery,
VectorQuery as LanceDbVectorQuery,
};
use lancedb::table::AnyQuery;
@@ -207,6 +207,48 @@ impl<'py> IntoPyObject<'py> for PyLanceDB<FtsQuery> {
#[derive(Clone)]
pub struct PyQueryVectors(Vec<Arc<dyn Array>>);
#[derive(Clone, FromPyObject)]
#[pyo3(from_item_all)]
pub struct PyColumnOrdering {
pub column_name: String,
pub ascending: bool,
pub nulls_first: bool,
}
impl From<ColumnOrdering> for PyColumnOrdering {
fn from(ordering: ColumnOrdering) -> Self {
Self {
column_name: ordering.column_name,
ascending: ordering.ascending,
nulls_first: ordering.nulls_first,
}
}
}
impl From<PyColumnOrdering> for ColumnOrdering {
fn from(ordering: PyColumnOrdering) -> Self {
Self {
column_name: ordering.column_name,
ascending: ordering.ascending,
nulls_first: ordering.nulls_first,
}
}
}
impl<'py> IntoPyObject<'py> for PyColumnOrdering {
type Target = PyDict;
type Output = Bound<'py, Self::Target>;
type Error = PyErr;
fn into_pyobject(self, py: pyo3::Python<'py>) -> PyResult<Self::Output> {
let dict = PyDict::new(py);
dict.set_item("column_name", self.column_name)?;
dict.set_item("ascending", self.ascending)?;
dict.set_item("nulls_first", self.nulls_first)?;
Ok(dict)
}
}
impl<'py> IntoPyObject<'py> for PyQueryVectors {
type Target = PyList;
type Output = Bound<'py, Self::Target>;
@@ -246,6 +288,7 @@ pub struct PyQueryRequest {
pub bypass_vector_index: Option<bool>,
pub postfilter: Option<bool>,
pub norm: Option<String>,
pub order_by: Option<Vec<PyColumnOrdering>>,
}
impl From<AnyQuery> for PyQueryRequest {
@@ -273,6 +316,9 @@ impl From<AnyQuery> for PyQueryRequest {
bypass_vector_index: None,
postfilter: None,
norm: None,
order_by: query_request
.order_by
.map(|order_by| order_by.into_iter().map(PyColumnOrdering::from).collect()),
},
AnyQuery::VectorQuery(vector_query) => Self {
limit: vector_query.base.limit,
@@ -297,6 +343,10 @@ impl From<AnyQuery> for PyQueryRequest {
bypass_vector_index: Some(!vector_query.use_index),
postfilter: Some(!vector_query.base.prefilter),
norm: vector_query.base.norm.map(|n| n.to_string()),
order_by: vector_query
.base
.order_by
.map(|order_by| order_by.into_iter().map(PyColumnOrdering::from).collect()),
},
}
}
@@ -475,6 +525,13 @@ impl Query {
})
}
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
let ordering =
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
#[pyo3(signature = ())]
pub fn output_schema(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
@@ -647,6 +704,13 @@ impl FTSQuery {
self.inner = self.inner.clone().offset(offset as usize);
}
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
let ordering =
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
pub fn fast_search(&mut self) {
self.inner = self.inner.clone().fast_search();
}
@@ -782,6 +846,13 @@ impl VectorQuery {
self.inner = self.inner.clone().offset(offset as usize);
}
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
let ordering =
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
pub fn fast_search(&mut self) {
self.inner = self.inner.clone().fast_search();
}
@@ -954,6 +1025,12 @@ impl HybridQuery {
self.inner_fts.offset(offset);
}
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
self.inner_vec.order_by(ordering.clone())?;
self.inner_fts.order_by(ordering)?;
Ok(())
}
pub fn fast_search(&mut self) {
self.inner_vec.fast_search();
self.inner_fts.fast_search();

View File

@@ -171,6 +171,141 @@ impl From<lancedb::table::MergeResult> for MergeResult {
}
}
/// Specification selecting Lance's MemWAL LSM-style write path for
/// `merge_insert`.
///
/// Constructed via the `bucket(...)`, `identity(...)`, or `unsharded()`
/// classmethods, then optionally chain `with_maintained_indexes(...)` and
/// `with_writer_config_defaults(...)`.
#[pyclass(from_py_object)]
#[derive(Clone, Debug)]
pub struct LsmWriteSpec {
inner: lancedb::table::LsmWriteSpec,
}
#[pymethods]
impl LsmWriteSpec {
/// Hash-bucket sharding by the unenforced primary key column.
#[staticmethod]
pub fn bucket(column: String, num_buckets: u32) -> Self {
Self {
inner: lancedb::table::LsmWriteSpec::bucket(column, num_buckets),
}
}
/// Identity sharding — shard by the raw value of `column`.
#[staticmethod]
pub fn identity(column: String) -> Self {
Self {
inner: lancedb::table::LsmWriteSpec::identity(column),
}
}
/// No sharding — every `merge_insert` call writes to a single
/// MemWAL shard.
#[staticmethod]
pub fn unsharded() -> Self {
Self {
inner: lancedb::table::LsmWriteSpec::unsharded(),
}
}
/// Replace the list of indexes the MemWAL should keep up to date as
/// rows are appended. Each name must reference an index that
/// already exists on the table at the time `set_lsm_write_spec`
/// is called.
pub fn with_maintained_indexes(&self, indexes: Vec<String>) -> Self {
Self {
inner: self.inner.clone().with_maintained_indexes(indexes),
}
}
/// Replace the default `ShardWriter` configuration recorded in the
/// MemWAL index, so every writer starts from the same defaults.
pub fn with_writer_config_defaults(&self, defaults: HashMap<String, String>) -> Self {
Self {
inner: self.inner.clone().with_writer_config_defaults(defaults),
}
}
pub fn __repr__(&self) -> String {
match &self.inner {
lancedb::table::LsmWriteSpec::Bucket {
column,
num_buckets,
maintained_indexes,
writer_config_defaults,
} => format!(
"LsmWriteSpec.bucket(column={:?}, num_buckets={}, maintained_indexes={:?}, writer_config_defaults={:?})",
column, num_buckets, maintained_indexes, writer_config_defaults,
),
lancedb::table::LsmWriteSpec::Identity {
column,
maintained_indexes,
writer_config_defaults,
} => format!(
"LsmWriteSpec.identity(column={:?}, maintained_indexes={:?}, writer_config_defaults={:?})",
column, maintained_indexes, writer_config_defaults,
),
lancedb::table::LsmWriteSpec::Unsharded {
maintained_indexes,
writer_config_defaults,
} => format!(
"LsmWriteSpec.unsharded(maintained_indexes={:?}, writer_config_defaults={:?})",
maintained_indexes, writer_config_defaults,
),
}
}
/// Discriminator string identifying the variant ("bucket", "identity",
/// or "unsharded").
#[getter]
pub fn spec_type(&self) -> &'static str {
match &self.inner {
lancedb::table::LsmWriteSpec::Bucket { .. } => "bucket",
lancedb::table::LsmWriteSpec::Identity { .. } => "identity",
lancedb::table::LsmWriteSpec::Unsharded { .. } => "unsharded",
}
}
/// Bucket and identity variants: the sharding column. `None` for unsharded.
#[getter]
pub fn column(&self) -> Option<String> {
match &self.inner {
lancedb::table::LsmWriteSpec::Bucket { column, .. }
| lancedb::table::LsmWriteSpec::Identity { column, .. } => Some(column.clone()),
lancedb::table::LsmWriteSpec::Unsharded { .. } => None,
}
}
/// Bucket variant only: the number of buckets.
#[getter]
pub fn num_buckets(&self) -> Option<u32> {
match &self.inner {
lancedb::table::LsmWriteSpec::Bucket { num_buckets, .. } => Some(*num_buckets),
_ => None,
}
}
/// Names of indexes the MemWAL should keep up to date during writes.
#[getter]
pub fn maintained_indexes(&self) -> Vec<String> {
self.inner.maintained_indexes().to_vec()
}
/// Default `ShardWriter` configuration recorded by this spec.
#[getter]
pub fn writer_config_defaults(&self) -> HashMap<String, String> {
self.inner.writer_config_defaults().clone()
}
}
impl From<LsmWriteSpec> for lancedb::table::LsmWriteSpec {
fn from(spec: LsmWriteSpec) -> Self {
spec.inner
}
}
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct AddColumnsResult {
@@ -805,6 +940,37 @@ impl Table {
})
}
pub fn set_unenforced_primary_key<'a>(
self_: PyRef<'a, Self>,
columns: Vec<String>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner
.set_unenforced_primary_key(columns)
.await
.infer_error()
})
}
pub fn set_lsm_write_spec<'a>(
self_: PyRef<'a, Self>,
spec: LsmWriteSpec,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.inner_ref()?.clone();
let native_spec = lancedb::table::LsmWriteSpec::from(spec);
future_into_py(self_.py(), async move {
inner.set_lsm_write_spec(native_spec).await.infer_error()
})
}
pub fn unset_lsm_write_spec(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.unset_lsm_write_spec().await.infer_error()
})
}
pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.28.0-beta.11"
version = "0.29.1-beta.0"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true
@@ -75,6 +75,11 @@ reqwest = { version = "0.12.0", default-features = false, features = [
"stream",
], optional = true }
http = { version = "1", optional = true } # Matching what is in reqwest
# OAuth dependencies (used by remote feature)
sha2 = { version = "0.10", optional = true }
base64 = { version = "0.22", optional = true }
urlencoding = { version = "2", optional = true }
open = { version = "5", optional = true }
uuid = { version = "1.7.0", features = ["v4"] }
polars-arrow = { version = ">=0.37,<0.40.0", optional = true }
polars = { version = ">=0.37,<0.40.0", optional = true }
@@ -128,7 +133,7 @@ huggingface = [
"lance-namespace-impls/dir-huggingface",
]
dynamodb = ["lance/dynamodb", "aws"]
remote = ["dep:reqwest", "dep:http", "lance-namespace-impls/rest", "lance-namespace-impls/rest-adapter"]
remote = ["dep:reqwest", "dep:http", "dep:sha2", "dep:base64", "dep:urlencoding", "dep:open", "lance-namespace-impls/rest", "lance-namespace-impls/rest-adapter"]
fp16kernels = ["lance-linalg/fp16kernels"]
s3-test = []
bedrock = ["dep:aws-sdk-bedrockruntime"]

View File

@@ -622,6 +622,8 @@ pub struct ConnectRequest {
pub struct ConnectBuilder {
request: ConnectRequest,
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
#[cfg(feature = "remote")]
oauth_config: Option<crate::remote::oauth::OAuthConfig>,
}
#[cfg(feature = "remote")]
@@ -643,6 +645,8 @@ impl ConnectBuilder {
session: None,
},
embedding_registry: None,
#[cfg(feature = "remote")]
oauth_config: None,
}
}
@@ -731,6 +735,19 @@ impl ConnectBuilder {
self
}
/// Configure OAuth authentication for LanceDB Cloud/Enterprise.
///
/// This creates an [`OAuthHeaderProvider`](crate::remote::OAuthHeaderProvider)
/// from the given config and sets it as the header provider, replacing any
/// previously configured header provider or API key.
///
/// Token acquisition and refresh are handled entirely in Rust.
#[cfg(feature = "remote")]
pub fn oauth_config(mut self, config: crate::remote::oauth::OAuthConfig) -> Self {
self.oauth_config = Some(config);
self
}
/// Provide a custom [`EmbeddingRegistry`] to use for this connection.
pub fn embedding_registry(mut self, registry: Arc<dyn EmbeddingRegistry>) -> Self {
self.embedding_registry = Some(registry);
@@ -874,9 +891,29 @@ impl ConnectBuilder {
let region = options.region.ok_or_else(|| Error::InvalidInput {
message: "A region is required when connecting to LanceDb Cloud".to_string(),
})?;
let api_key = options.api_key.ok_or_else(|| Error::InvalidInput {
message: "An api_key is required when connecting to LanceDb Cloud".to_string(),
})?;
// When OAuth is configured, api_key is not required
let api_key = match (&self.oauth_config, &options.api_key) {
(Some(_), None) => String::new(),
(Some(_), Some(key)) => key.clone(),
(None, Some(key)) => key.clone(),
(None, None) => {
return Err(Error::InvalidInput {
message:
"An api_key or oauth_config is required when connecting to LanceDb Cloud"
.to_string(),
});
}
};
let mut client_config = self.request.client_config;
// Apply OAuth header provider if configured
if let Some(oauth_config) = self.oauth_config {
let provider = crate::remote::oauth::OAuthHeaderProvider::new(oauth_config)?;
client_config.header_provider =
Some(Arc::new(provider) as Arc<dyn crate::remote::client::HeaderProvider>);
}
let storage_options = StorageOptions(options.storage_options.clone());
let internal = Arc::new(crate::remote::db::RemoteDatabase::try_new(
@@ -884,7 +921,7 @@ impl ConnectBuilder {
&api_key,
&region,
options.host_override,
self.request.client_config,
client_config,
storage_options.into(),
)?);
Ok(Connection {

View File

@@ -450,6 +450,10 @@ impl PermutationReader {
}
pub async fn take_offsets(&self, offsets: &[u64], selection: Select) -> Result<RecordBatch> {
if offsets.is_empty() {
return Ok(RecordBatch::new_empty(self.output_schema(selection).await?));
}
if let Some(permutation_table) = &self.permutation_table {
let offset_map = self.get_offset_map(permutation_table).await?;
let row_ids = offsets
@@ -955,4 +959,62 @@ mod tests {
.to_vec();
assert_eq!(idx_values, &all_idx_values[4997..5000]);
}
#[tokio::test]
async fn test_take_offsets_empty_identity_reader() {
let base_table = lance_datagen::gen_batch()
.col("idx", lance_datagen::array::step::<Int32Type>())
.into_mem_table("tbl", RowCount::from(10), BatchCount::from(1))
.await;
let reader = PermutationReader::identity(base_table.base_table().clone()).await;
let batch = reader.take_offsets(&[], Select::All).await.unwrap();
assert_eq!(batch.num_rows(), 0);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.schema().field(0).name(), "idx");
}
#[tokio::test]
async fn test_take_offsets_empty_with_permutation_table() {
let (base_table, row_ids_table, _) = setup_permutation_tables(5).await;
let reader = PermutationReader::try_from_tables(
base_table.base_table().clone(),
row_ids_table.base_table().clone(),
0,
)
.await
.unwrap();
let batch = reader.take_offsets(&[], Select::All).await.unwrap();
assert_eq!(batch.num_rows(), 0);
assert_eq!(batch.schema().fields().len(), 2);
assert_eq!(batch.schema().field(0).name(), "idx");
assert_eq!(batch.schema().field(1).name(), "other_col");
}
#[tokio::test]
async fn test_take_offsets_empty_with_column_selection() {
let (base_table, row_ids_table, _) = setup_permutation_tables(5).await;
let reader = PermutationReader::try_from_tables(
base_table.base_table().clone(),
row_ids_table.base_table().clone(),
0,
)
.await
.unwrap();
let batch = reader
.take_offsets(&[], Select::Columns(vec!["idx".to_string()]))
.await
.unwrap();
assert_eq!(batch.num_rows(), 0);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.schema().field(0).name(), "idx");
}
}

View File

@@ -11,6 +11,8 @@ use datafusion_expr::Expr;
use datafusion_physical_plan::ExecutionPlan;
use futures::{FutureExt, TryFutureExt, TryStreamExt, stream, try_join};
use half::f16;
/// Re-export Lance ColumnOrdering type for use in query ordering
pub use lance::dataset::scanner::ColumnOrdering;
use lance::dataset::{ROW_ID, scanner::DatasetRecordBatchStream};
use lance_arrow::RecordBatchExt;
use lance_datafusion::exec::execute_plan;
@@ -510,6 +512,11 @@ pub trait QueryBase {
/// the scores are converted to ranks and then normalized. If "Score", the
/// scores are normalized directly.
fn norm(self, norm: NormalizeMethod) -> Self;
/// Sort the results by the specified column(s).
///
/// This allows ordering query results by one or more columns in either ascending or descending order.
fn order_by(self, ordering: Option<Vec<ColumnOrdering>>) -> Self;
}
pub trait HasQuery {
@@ -574,6 +581,11 @@ impl<T: HasQuery> QueryBase for T {
self.mut_query().norm = Some(norm);
self
}
fn order_by(mut self, ordering: Option<Vec<ColumnOrdering>>) -> Self {
self.mut_query().order_by = ordering;
self
}
}
/// Options for controlling the execution of a query
@@ -750,6 +762,11 @@ pub struct QueryRequest {
///
/// By default, this is false (scoring columns are auto-projected for backward compatibility).
pub disable_scoring_autoprojection: bool,
/// Sort the results by the specified column(s).
///
/// This allows ordering query results by one or more columns in either ascending or descending order.
pub order_by: Option<Vec<ColumnOrdering>>,
}
impl Default for QueryRequest {
@@ -766,6 +783,7 @@ impl Default for QueryRequest {
reranker: None,
norm: None,
disable_scoring_autoprojection: false,
order_by: None,
}
}
}

View File

@@ -8,6 +8,7 @@
pub(crate) mod client;
pub(crate) mod db;
pub mod oauth;
mod retry;
pub(crate) mod table;
pub(crate) mod util;
@@ -20,3 +21,4 @@ const JSON_CONTENT_TYPE: &str = "application/json";
pub use client::{ClientConfig, HeaderProvider, RetryConfig, TimeoutConfig, TlsConfig};
pub use db::{RemoteDatabaseOptions, RemoteDatabaseOptionsBuilder};
pub use oauth::{OAuthConfig, OAuthFlow, OAuthHeaderProvider};

View File

@@ -0,0 +1,906 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use log::{debug, info, warn};
use reqwest::Client;
use serde::Deserialize;
use tokio::sync::RwLock;
use crate::error::{Error, Result};
use crate::remote::client::HeaderProvider;
const DEFAULT_REFRESH_BUFFER_SECS: u64 = 300;
const DEFAULT_CALLBACK_PORT: u16 = 8400;
const AZURE_IMDS_ENDPOINT: &str = "http://169.254.169.254/metadata/identity/oauth2/token";
const AZURE_IMDS_API_VERSION: &str = "2018-02-01";
/// OAuth authentication flow configuration.
#[derive(Debug, Clone)]
pub enum OAuthFlow {
/// Client Credentials grant (service-to-service / M2M).
/// Requires `client_secret` in [`OAuthConfig`].
ClientCredentials,
/// Authorization Code with PKCE (interactive browser-based auth).
AuthorizationCodePKCE {
/// Redirect URI (default: `http://localhost:{callback_port}/callback`)
redirect_uri: Option<String>,
/// Port for the local HTTP callback server (default: 8400)
callback_port: Option<u16>,
},
/// Device Code grant (CLI / headless environments).
/// Displays a verification URL and user code for out-of-band authentication.
DeviceCode,
/// Azure Managed Identity via IMDS.
/// Works on Azure VMs, AKS, App Service, and Azure Functions.
AzureManagedIdentity {
/// Client ID for user-assigned managed identity.
/// Omit for system-assigned managed identity.
client_id: Option<String>,
},
/// Workload Identity Federation.
/// Exchanges a platform token (K8s service account, GitHub OIDC) for an IdP token.
WorkloadIdentity {
/// Path to the federated token file
/// (e.g. `AZURE_FEDERATED_TOKEN_FILE`).
token_file: String,
},
}
/// OAuth configuration for LanceDB authentication.
///
/// All token acquisition and refresh is handled in the Rust layer.
/// Python and TypeScript bindings expose this as a plain config object.
#[derive(Debug, Clone)]
pub struct OAuthConfig {
/// OIDC issuer URL or OAuth authority URL.
/// For Azure: `https://login.microsoftonline.com/{tenant_id}/v2.0`
pub issuer_url: String,
/// Application / Client ID.
pub client_id: String,
/// Client secret (required for `ClientCredentials`, optional for others).
pub client_secret: Option<String>,
/// OAuth scopes to request.
/// For Azure: `["api://{app_id}/.default"]`
pub scopes: Vec<String>,
/// Authentication flow to use.
pub flow: OAuthFlow,
/// Seconds before token expiry to trigger proactive refresh (default: 300).
pub refresh_buffer_secs: Option<u64>,
}
// -- OIDC Discovery --
#[derive(Debug, Deserialize)]
struct OidcDiscovery {
token_endpoint: String,
authorization_endpoint: Option<String>,
device_authorization_endpoint: Option<String>,
}
// -- Token Response --
#[derive(Debug, Deserialize)]
struct TokenResponse {
access_token: String,
#[serde(default)]
refresh_token: Option<String>,
/// Token lifetime in seconds.
/// Some providers (Azure IMDS) return this as a string, so we accept both.
#[serde(default, deserialize_with = "deserialize_optional_u64_or_string")]
expires_in: Option<u64>,
#[serde(default)]
#[allow(dead_code)]
token_type: Option<String>,
}
fn deserialize_optional_u64_or_string<'de, D>(
deserializer: D,
) -> std::result::Result<Option<u64>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de;
struct U64OrString;
impl<'de> de::Visitor<'de> for U64OrString {
type Value = Option<u64>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a u64, a numeric string, or null")
}
fn visit_u64<E: de::Error>(self, v: u64) -> std::result::Result<Self::Value, E> {
Ok(Some(v))
}
fn visit_i64<E: de::Error>(self, v: i64) -> std::result::Result<Self::Value, E> {
Ok(Some(v as u64))
}
fn visit_str<E: de::Error>(self, v: &str) -> std::result::Result<Self::Value, E> {
v.parse::<u64>().map(Some).map_err(de::Error::custom)
}
fn visit_none<E: de::Error>(self) -> std::result::Result<Self::Value, E> {
Ok(None)
}
fn visit_unit<E: de::Error>(self) -> std::result::Result<Self::Value, E> {
Ok(None)
}
}
deserializer.deserialize_any(U64OrString)
}
// -- Device Code Response --
#[derive(Debug, Deserialize)]
struct DeviceCodeResponse {
device_code: String,
user_code: String,
verification_uri: String,
#[serde(default)]
verification_uri_complete: Option<String>,
expires_in: u64,
interval: Option<u64>,
}
// -- Internal Token State --
struct TokenState {
access_token: Option<String>,
refresh_token: Option<String>,
expires_at: Option<Instant>,
}
impl TokenState {
fn new() -> Self {
Self {
access_token: None,
refresh_token: None,
expires_at: None,
}
}
fn is_expired(&self, buffer: Duration) -> bool {
match (self.access_token.as_ref(), self.expires_at) {
(Some(_), Some(expires_at)) => Instant::now() + buffer >= expires_at,
(None, _) => true,
(Some(_), None) => false, // no expiry info, assume valid
}
}
fn update(&mut self, resp: &TokenResponse) {
self.access_token = Some(resp.access_token.clone());
if resp.refresh_token.is_some() {
self.refresh_token = resp.refresh_token.clone();
}
self.expires_at = resp
.expires_in
.map(|secs| Instant::now() + Duration::from_secs(secs));
}
}
/// OAuth header provider that manages the full token lifecycle.
///
/// Implements [`HeaderProvider`] to inject `Authorization: Bearer <token>`
/// headers into every LanceDB request, with automatic token refresh.
pub struct OAuthHeaderProvider {
config: OAuthConfig,
http_client: Client,
token_state: Arc<RwLock<TokenState>>,
/// Cached OIDC discovery document
discovery: Arc<RwLock<Option<OidcDiscovery>>>,
refresh_buffer: Duration,
}
impl std::fmt::Debug for OAuthHeaderProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OAuthHeaderProvider")
.field("issuer_url", &self.config.issuer_url)
.field("client_id", &self.config.client_id)
.field("flow", &self.config.flow)
.finish()
}
}
impl OAuthHeaderProvider {
/// Create a new OAuth header provider from configuration.
pub fn new(config: OAuthConfig) -> Result<Self> {
// Validate config upfront
if matches!(config.flow, OAuthFlow::ClientCredentials) && config.client_secret.is_none() {
return Err(Error::InvalidInput {
message: "client_secret is required for ClientCredentials flow".to_string(),
});
}
if config.scopes.is_empty() {
return Err(Error::InvalidInput {
message: "At least one OAuth scope is required".to_string(),
});
}
let http_client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| Error::Runtime {
message: format!("Failed to create HTTP client for OAuth: {e}"),
})?;
let refresh_buffer = Duration::from_secs(
config
.refresh_buffer_secs
.unwrap_or(DEFAULT_REFRESH_BUFFER_SECS),
);
Ok(Self {
config,
http_client,
token_state: Arc::new(RwLock::new(TokenState::new())),
discovery: Arc::new(RwLock::new(None)),
refresh_buffer,
})
}
/// Get a valid access token, refreshing if necessary.
async fn get_valid_token(&self) -> Result<String> {
// Fast path: check if current token is still valid
{
let state = self.token_state.read().await;
if !state.is_expired(self.refresh_buffer)
&& let Some(ref token) = state.access_token
{
return Ok(token.clone());
}
}
// Slow path: acquire or refresh token
let mut state = self.token_state.write().await;
// Double-check after acquiring write lock
if !state.is_expired(self.refresh_buffer)
&& let Some(ref token) = state.access_token
{
return Ok(token.clone());
}
let uses_refresh_token = !matches!(
self.config.flow,
OAuthFlow::ClientCredentials
| OAuthFlow::AzureManagedIdentity { .. }
| OAuthFlow::WorkloadIdentity { .. }
);
let resp = if let Some(ref refresh_token) = state.refresh_token
&& uses_refresh_token
{
debug!("Refreshing OAuth token using refresh_token");
self.refresh_with_token(refresh_token).await?
} else {
debug!("Acquiring new OAuth token via {:?} flow", self.config.flow);
self.acquire_token().await?
};
state.update(&resp);
Ok(resp.access_token)
}
/// Acquire a new token using the configured flow.
async fn acquire_token(&self) -> Result<TokenResponse> {
match &self.config.flow {
OAuthFlow::ClientCredentials => self.acquire_client_credentials().await,
OAuthFlow::AuthorizationCodePKCE {
redirect_uri,
callback_port,
} => {
self.acquire_authorization_code_pkce(
redirect_uri.as_deref(),
callback_port.unwrap_or(DEFAULT_CALLBACK_PORT),
)
.await
}
OAuthFlow::DeviceCode => self.acquire_device_code().await,
OAuthFlow::AzureManagedIdentity { client_id } => {
self.acquire_managed_identity(client_id.as_deref()).await
}
OAuthFlow::WorkloadIdentity { token_file } => {
self.acquire_workload_identity(token_file).await
}
}
}
// -- OIDC Discovery --
async fn get_discovery(&self) -> Result<OidcDiscovery> {
{
let cached = self.discovery.read().await;
if let Some(ref disc) = *cached {
return Ok(OidcDiscovery {
token_endpoint: disc.token_endpoint.clone(),
authorization_endpoint: disc.authorization_endpoint.clone(),
device_authorization_endpoint: disc.device_authorization_endpoint.clone(),
});
}
}
let mut cache = self.discovery.write().await;
// Double-check
if let Some(ref disc) = *cache {
return Ok(OidcDiscovery {
token_endpoint: disc.token_endpoint.clone(),
authorization_endpoint: disc.authorization_endpoint.clone(),
device_authorization_endpoint: disc.device_authorization_endpoint.clone(),
});
}
let discovery_url = format!(
"{}/.well-known/openid-configuration",
self.config.issuer_url.trim_end_matches('/')
);
debug!("Fetching OIDC discovery from {}", discovery_url);
let resp = self
.http_client
.get(&discovery_url)
.send()
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to fetch OIDC discovery document: {e}"),
})?;
if !resp.status().is_success() {
return Err(Error::Runtime {
message: format!(
"OIDC discovery failed with status {}: {}",
resp.status(),
resp.text().await.unwrap_or_default()
),
});
}
let disc: OidcDiscovery = resp.json().await.map_err(|e| Error::Runtime {
message: format!("Failed to parse OIDC discovery document: {e}"),
})?;
let result = OidcDiscovery {
token_endpoint: disc.token_endpoint.clone(),
authorization_endpoint: disc.authorization_endpoint.clone(),
device_authorization_endpoint: disc.device_authorization_endpoint.clone(),
};
*cache = Some(disc);
Ok(result)
}
fn get_token_endpoint_from_issuer(&self) -> String {
// Derive Azure v2.0 token endpoint from issuer URL
// issuer: https://login.microsoftonline.com/{tenant}/v2.0
// token: https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token
let base = self.config.issuer_url.trim_end_matches("/v2.0");
format!("{base}/oauth2/v2.0/token")
}
async fn get_token_endpoint(&self) -> Result<String> {
match self.get_discovery().await {
Ok(disc) => Ok(disc.token_endpoint),
Err(e) => {
warn!("OIDC discovery failed, falling back to derived endpoint: {e}");
Ok(self.get_token_endpoint_from_issuer())
}
}
}
fn scopes_string(&self) -> String {
self.config.scopes.join(" ")
}
// -- Client Credentials Flow --
async fn acquire_client_credentials(&self) -> Result<TokenResponse> {
let client_secret = self
.config
.client_secret
.as_ref()
.ok_or(Error::InvalidInput {
message: "client_secret is required for ClientCredentials flow".to_string(),
})?;
let token_endpoint = self.get_token_endpoint().await?;
let params = [
("grant_type", "client_credentials"),
("client_id", &self.config.client_id),
("client_secret", client_secret),
("scope", &self.scopes_string()),
];
self.post_token_request(&token_endpoint, &params).await
}
// -- Authorization Code + PKCE Flow --
async fn acquire_authorization_code_pkce(
&self,
redirect_uri: Option<&str>,
callback_port: u16,
) -> Result<TokenResponse> {
use rand::Rng;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
let discovery = self.get_discovery().await?;
let auth_endpoint = discovery.authorization_endpoint.ok_or(Error::Runtime {
message: "OIDC discovery did not provide authorization_endpoint".to_string(),
})?;
let default_redirect = format!("http://localhost:{callback_port}/callback");
let redirect = redirect_uri.unwrap_or(&default_redirect);
// Generate PKCE code verifier and challenge (S256)
const PKCE_CHARSET: &[u8] =
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~";
let code_verifier: String = {
let mut rng = rand::rng();
(0..128)
.map(|_| {
let idx = rng.random_range(0..PKCE_CHARSET.len());
PKCE_CHARSET[idx] as char
})
.collect()
};
let code_challenge = {
use sha2::{Digest, Sha256};
let hash = Sha256::digest(code_verifier.as_bytes());
base64_url_encode(&hash)
};
let state: String = {
let mut rng = rand::rng();
(0..32)
.map(|_| {
let idx = rng.random_range(0..16u8);
b"0123456789abcdef"[idx as usize] as char
})
.collect()
};
// Build authorization URL
let auth_url = format!(
"{auth_endpoint}?response_type=code&client_id={}&redirect_uri={}&scope={}&code_challenge={}&code_challenge_method=S256&state={state}",
urlencoding::encode(&self.config.client_id),
urlencoding::encode(redirect),
urlencoding::encode(&self.scopes_string()),
urlencoding::encode(&code_challenge),
);
info!("Opening browser for OAuth login...");
info!("If the browser doesn't open, visit: {auth_url}");
// Try to open browser
let _ = open::that(&auth_url);
// Start local callback server
let listener = TcpListener::bind(format!("127.0.0.1:{callback_port}"))
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to bind callback server on port {callback_port}: {e}"),
})?;
info!("Waiting for OAuth callback on port {callback_port}...");
let (mut stream, _) = listener.accept().await.map_err(|e| Error::Runtime {
message: format!("Failed to accept callback connection: {e}"),
})?;
// Read the HTTP request
let mut buf = vec![0u8; 4096];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to read callback request: {e}"),
})?;
let request_str = String::from_utf8_lossy(&buf[..n]);
// Extract authorization code from query params
let code = extract_query_param(&request_str, "code").ok_or(Error::Runtime {
message: "No authorization code in callback".to_string(),
})?;
let returned_state = extract_query_param(&request_str, "state");
if returned_state.as_deref() != Some(&state) {
return Err(Error::Runtime {
message: "OAuth state mismatch — possible CSRF attack".to_string(),
});
}
// Send success response to browser
let response = "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n<html><body><h2>Authentication successful!</h2><p>You can close this window.</p></body></html>";
let _ = stream.write_all(response.as_bytes()).await;
// Exchange code for tokens
let token_endpoint = self.get_token_endpoint().await?;
let mut params = vec![
("grant_type", "authorization_code"),
("client_id", self.config.client_id.as_str()),
("code", &code),
("redirect_uri", redirect),
("code_verifier", &code_verifier),
];
if let Some(ref secret) = self.config.client_secret {
params.push(("client_secret", secret));
}
self.post_token_request(&token_endpoint, &params).await
}
// -- Device Code Flow --
async fn acquire_device_code(&self) -> Result<TokenResponse> {
let discovery = self.get_discovery().await?;
let device_endpoint = discovery
.device_authorization_endpoint
.ok_or(Error::Runtime {
message: "OIDC discovery did not provide device_authorization_endpoint".to_string(),
})?;
let params = [
("client_id", self.config.client_id.as_str()),
("scope", &self.scopes_string()),
];
let resp = self
.http_client
.post(&device_endpoint)
.form(&params)
.send()
.await
.map_err(|e| Error::Runtime {
message: format!("Device code request failed: {e}"),
})?;
if !resp.status().is_success() {
return Err(Error::Runtime {
message: format!(
"Device code request failed with status {}: {}",
resp.status(),
resp.text().await.unwrap_or_default()
),
});
}
let device_resp: DeviceCodeResponse = resp.json().await.map_err(|e| Error::Runtime {
message: format!("Failed to parse device code response: {e}"),
})?;
// Display instructions to user
info!(
"To sign in, visit {} and enter code: {}",
device_resp.verification_uri, device_resp.user_code
);
if let Some(ref uri) = device_resp.verification_uri_complete {
info!("Or visit: {uri}");
}
// Poll token endpoint
let token_endpoint = self.get_token_endpoint().await?;
let poll_interval = Duration::from_secs(device_resp.interval.unwrap_or(5));
let deadline = Instant::now() + Duration::from_secs(device_resp.expires_in);
loop {
if Instant::now() >= deadline {
return Err(Error::Runtime {
message: "Device code flow timed out waiting for user authentication"
.to_string(),
});
}
tokio::time::sleep(poll_interval).await;
let poll_params = [
("grant_type", "urn:ietf:params:oauth:grant-type:device_code"),
("client_id", self.config.client_id.as_str()),
("device_code", &device_resp.device_code),
];
let poll_resp = self
.http_client
.post(&token_endpoint)
.form(&poll_params)
.send()
.await
.map_err(|e| Error::Runtime {
message: format!("Device code poll failed: {e}"),
})?;
if poll_resp.status().is_success() {
return poll_resp.json().await.map_err(|e| Error::Runtime {
message: format!("Failed to parse token response: {e}"),
});
}
// Check for pending / slow_down errors
let body = poll_resp.text().await.unwrap_or_default();
if body.contains("authorization_pending") {
continue;
}
if body.contains("slow_down") {
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
return Err(Error::Runtime {
message: format!("Device code poll failed: {body}"),
});
}
}
// -- Azure Managed Identity Flow --
async fn acquire_managed_identity(&self, mi_client_id: Option<&str>) -> Result<TokenResponse> {
let resource = self.scopes_string().replace("/.default", "");
let mut url = format!(
"{AZURE_IMDS_ENDPOINT}?api-version={AZURE_IMDS_API_VERSION}&resource={}",
urlencoding::encode(&resource),
);
if let Some(cid) = mi_client_id {
url.push_str(&format!("&client_id={}", urlencoding::encode(cid)));
}
let resp = self
.http_client
.get(&url)
.header("Metadata", "true")
.send()
.await
.map_err(|e| Error::Runtime {
message: format!("Azure IMDS request failed: {e}"),
})?;
if !resp.status().is_success() {
return Err(Error::Runtime {
message: format!(
"Azure IMDS returned status {}: {}",
resp.status(),
resp.text().await.unwrap_or_default()
),
});
}
resp.json().await.map_err(|e| Error::Runtime {
message: format!("Failed to parse IMDS token response: {e}"),
})
}
// -- Workload Identity Federation Flow --
async fn acquire_workload_identity(&self, token_file: &str) -> Result<TokenResponse> {
let federated_token =
tokio::fs::read_to_string(token_file)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to read federated token file '{token_file}': {e}"),
})?;
let token_endpoint = self.get_token_endpoint().await?;
let params = [
("grant_type", "client_credentials"),
("client_id", self.config.client_id.as_str()),
(
"client_assertion_type",
"urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
),
("client_assertion", federated_token.trim()),
("scope", &self.scopes_string()),
];
self.post_token_request(&token_endpoint, &params).await
}
// -- Refresh Token Flow --
async fn refresh_with_token(&self, refresh_token: &str) -> Result<TokenResponse> {
let token_endpoint = self.get_token_endpoint().await?;
let mut params = vec![
("grant_type", "refresh_token"),
("client_id", self.config.client_id.as_str()),
("refresh_token", refresh_token),
];
if let Some(ref secret) = self.config.client_secret {
params.push(("client_secret", secret.as_str()));
}
self.post_token_request(&token_endpoint, &params).await
}
// -- Shared Helpers --
async fn post_token_request(
&self,
endpoint: &str,
params: &[(&str, &str)],
) -> Result<TokenResponse> {
let resp = self
.http_client
.post(endpoint)
.form(params)
.send()
.await
.map_err(|e| Error::Runtime {
message: format!("Token request to {endpoint} failed: {e}"),
})?;
if !resp.status().is_success() {
return Err(Error::Runtime {
message: format!(
"Token request failed with status {}: {}",
resp.status(),
resp.text().await.unwrap_or_default()
),
});
}
resp.json().await.map_err(|e| Error::Runtime {
message: format!("Failed to parse token response: {e}"),
})
}
}
#[async_trait::async_trait]
impl HeaderProvider for OAuthHeaderProvider {
async fn get_headers(&self) -> Result<HashMap<String, String>> {
let token = self.get_valid_token().await?;
Ok(HashMap::from([(
"authorization".to_string(),
format!("Bearer {token}"),
)]))
}
}
// -- Utility functions --
fn base64_url_encode(input: &[u8]) -> String {
use base64::Engine;
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(input)
}
/// Extract a query parameter value from a raw HTTP GET request line.
fn extract_query_param(request: &str, param: &str) -> Option<String> {
let first_line = request.lines().next()?;
let path = first_line.split_whitespace().nth(1)?;
let query = path.split('?').nth(1)?;
for pair in query.split('&') {
let mut kv = pair.splitn(2, '=');
if let (Some(key), Some(value)) = (kv.next(), kv.next())
&& key == param
{
return Some(urlencoding::decode(value).ok()?.into_owned());
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_extract_query_param() {
let request = "GET /callback?code=abc123&state=xyz HTTP/1.1\r\nHost: localhost\r\n";
assert_eq!(
extract_query_param(request, "code"),
Some("abc123".to_string())
);
assert_eq!(
extract_query_param(request, "state"),
Some("xyz".to_string())
);
assert_eq!(extract_query_param(request, "missing"), None);
}
#[test]
fn test_extract_query_param_encoded() {
let request = "GET /callback?code=abc%20123&state=x%26y HTTP/1.1\r\n";
assert_eq!(
extract_query_param(request, "code"),
Some("abc 123".to_string())
);
assert_eq!(
extract_query_param(request, "state"),
Some("x&y".to_string())
);
}
#[test]
fn test_token_state_expiry() {
let mut state = TokenState::new();
assert!(state.is_expired(Duration::from_secs(0)));
state.access_token = Some("tok".to_string());
state.expires_at = Some(Instant::now() + Duration::from_secs(600));
assert!(!state.is_expired(Duration::from_secs(300)));
assert!(state.is_expired(Duration::from_secs(601)));
}
#[test]
fn test_base64_url_encode() {
let input = b"hello world";
let encoded = base64_url_encode(input);
assert!(!encoded.contains('+'));
assert!(!encoded.contains('/'));
assert!(!encoded.contains('='));
}
#[test]
fn test_scopes_string() {
let config = OAuthConfig {
issuer_url: "https://login.microsoftonline.com/tenant/v2.0".to_string(),
client_id: "app-id".to_string(),
client_secret: Some("secret".to_string()),
scopes: vec!["scope1".to_string(), "scope2".to_string()],
flow: OAuthFlow::ClientCredentials,
refresh_buffer_secs: None,
};
let provider = OAuthHeaderProvider::new(config).unwrap();
assert_eq!(provider.scopes_string(), "scope1 scope2");
}
#[test]
fn test_token_endpoint_derivation() {
let config = OAuthConfig {
issuer_url: "https://login.microsoftonline.com/my-tenant/v2.0".to_string(),
client_id: "id".to_string(),
client_secret: None,
scopes: vec!["api://test/.default".to_string()],
flow: OAuthFlow::DeviceCode,
refresh_buffer_secs: None,
};
let provider = OAuthHeaderProvider::new(config).unwrap();
assert_eq!(
provider.get_token_endpoint_from_issuer(),
"https://login.microsoftonline.com/my-tenant/oauth2/v2.0/token"
);
}
#[test]
fn test_client_credentials_requires_secret() {
let config = OAuthConfig {
issuer_url: "https://login.microsoftonline.com/tenant/v2.0".to_string(),
client_id: "app-id".to_string(),
client_secret: None,
scopes: vec!["scope".to_string()],
flow: OAuthFlow::ClientCredentials,
refresh_buffer_secs: None,
};
assert!(OAuthHeaderProvider::new(config).is_err());
}
#[test]
fn test_empty_scopes_rejected() {
let config = OAuthConfig {
issuer_url: "https://login.microsoftonline.com/tenant/v2.0".to_string(),
client_id: "app-id".to_string(),
client_secret: None,
scopes: vec![],
flow: OAuthFlow::DeviceCode,
refresh_buffer_secs: None,
};
assert!(OAuthHeaderProvider::new(config).is_err());
}
}

View File

@@ -518,6 +518,21 @@ impl<S: HttpSend> RemoteTable<S> {
}
}
if let Some(order_by) = &params.order_by {
body["order_by"] = serde_json::Value::Array(
order_by
.iter()
.map(|o| {
serde_json::json!({
"column_name": o.column_name,
"ascending": o.ascending,
"nulls_first": o.nulls_first,
})
})
.collect(),
);
}
Ok(())
}
@@ -1652,6 +1667,24 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
Ok(merge_insert_response)
}
async fn set_unenforced_primary_key(&self, _columns: &[&str]) -> Result<()> {
Err(Error::NotSupported {
message: "set_unenforced_primary_key is not supported on LanceDB cloud.".into(),
})
}
async fn set_lsm_write_spec(&self, _spec: crate::table::LsmWriteSpec) -> Result<()> {
Err(Error::NotSupported {
message: "set_lsm_write_spec is not supported on LanceDB cloud.".into(),
})
}
async fn unset_lsm_write_spec(&self) -> Result<()> {
Err(Error::NotSupported {
message: "unset_lsm_write_spec is not supported on LanceDB cloud.".into(),
})
}
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
Ok(Box::new(RemoteTags { inner: self }))
}
@@ -2078,7 +2111,7 @@ mod tests {
use crate::{
DistanceType, Error, Table,
index::{Index, IndexStatistics, IndexType, vector::IvfPqIndexBuilder},
query::{ExecutableQuery, QueryBase},
query::{ColumnOrdering, ExecutableQuery, QueryBase},
remote::ARROW_FILE_CONTENT_TYPE,
};
@@ -2988,6 +3021,18 @@ mod tests {
"distance_type": "cosine",
"bypass_vector_index": true,
"columns": ["a", "b"],
"order_by": [
{
"column_name": "score",
"ascending": false,
"nulls_first": true,
},
{
"column_name": "id",
"ascending": true,
"nulls_first": false,
}
],
"nprobes": 12,
"minimum_nprobes": 12,
"maximum_nprobes": 12,
@@ -3019,6 +3064,10 @@ mod tests {
.limit(42)
.offset(10)
.select(Select::columns(&["a", "b"]))
.order_by(Some(vec![
ColumnOrdering::desc_nulls_first("score".to_string()),
ColumnOrdering::asc_nulls_last("id".to_string()),
]))
.nearest_to(vec![0.1, 0.2, 0.3])
.unwrap()
.column("my_vector")

View File

@@ -74,6 +74,7 @@ pub(crate) mod dataset;
pub mod delete;
pub mod merge;
pub mod optimize;
mod primary_key;
pub mod query;
pub mod schema_evolution;
pub mod update;
@@ -272,6 +273,176 @@ pub trait Tags: Send + Sync {
pub use self::merge::MergeResult;
/// Specification selecting Lance's MemWAL LSM-style write path for
/// `merge_insert`.
///
/// Construct via [`LsmWriteSpec::bucket`], [`LsmWriteSpec::identity`], or
/// [`LsmWriteSpec::unsharded`], then optionally chain
/// [`LsmWriteSpec::with_maintained_indexes`] (indexes the MemWAL keeps up to
/// date) and [`LsmWriteSpec::with_writer_config_defaults`] (default
/// `ShardWriter` configuration recorded in the MemWAL index).
///
/// All variants require the table to have an unenforced primary key.
///
/// Install a spec with [`Table::set_lsm_write_spec`] and remove it with
/// [`Table::unset_lsm_write_spec`]. The actual `merge_insert` dispatch
/// onto the MemWAL writer is a follow-up.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum LsmWriteSpec {
/// Hash-bucket sharding by the unenforced primary key column.
///
/// `column` must equal the table's currently-set single-column
/// unenforced primary key. `num_buckets` must be in `[1, 1024]`.
/// Iceberg-compatible Murmur3-x86-32 (seed 0) is used so each row's
/// `bucket(column, num_buckets)` value is stable across processes.
Bucket {
column: String,
num_buckets: u32,
/// Names of indexes (already created on the table) that the
/// MemWAL should maintain in-memory as rows are appended.
maintained_indexes: Vec<String>,
/// Default `ShardWriter` configuration recorded in the MemWAL index.
writer_config_defaults: HashMap<String, String>,
},
/// Identity sharding — shard by the raw value of `column`.
///
/// Use this when the data is already partitioned by `column`; each
/// distinct value of `column` becomes its own shard.
Identity {
column: String,
/// Names of indexes (already created on the table) that the
/// MemWAL should maintain in-memory as rows are appended.
maintained_indexes: Vec<String>,
/// Default `ShardWriter` configuration recorded in the MemWAL index.
writer_config_defaults: HashMap<String, String>,
},
/// No sharding — every `merge_insert` call writes to a single MemWAL shard.
Unsharded {
/// Names of indexes (already created on the table) that the
/// MemWAL should maintain in-memory as rows are appended.
maintained_indexes: Vec<String>,
/// Default `ShardWriter` configuration recorded in the MemWAL index.
writer_config_defaults: HashMap<String, String>,
},
}
impl LsmWriteSpec {
/// Construct a hash-bucket sharding spec with no maintained indexes.
pub fn bucket(column: impl Into<String>, num_buckets: u32) -> Self {
Self::Bucket {
column: column.into(),
num_buckets,
maintained_indexes: Vec::new(),
writer_config_defaults: HashMap::new(),
}
}
/// Construct an identity-sharding spec (shard by the raw value of
/// `column`) with no maintained indexes.
pub fn identity(column: impl Into<String>) -> Self {
Self::Identity {
column: column.into(),
maintained_indexes: Vec::new(),
writer_config_defaults: HashMap::new(),
}
}
/// Construct an unsharded spec with no maintained indexes.
pub fn unsharded() -> Self {
Self::Unsharded {
maintained_indexes: Vec::new(),
writer_config_defaults: HashMap::new(),
}
}
/// Replace the list of indexes the MemWAL should keep up to date as
/// rows are appended. Each name must reference an index that already
/// exists on the table at the time `set_lsm_write_spec` is called.
pub fn with_maintained_indexes<I, S>(mut self, indexes: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let v: Vec<String> = indexes.into_iter().map(Into::into).collect();
match &mut self {
Self::Bucket {
maintained_indexes, ..
}
| Self::Identity {
maintained_indexes, ..
}
| Self::Unsharded {
maintained_indexes, ..
} => *maintained_indexes = v,
}
self
}
/// Replace the default `ShardWriter` configuration recorded in the MemWAL
/// index, so every writer starts from the same defaults. Keys are
/// `ShardWriter` config field names (`Duration` knobs use a `_ms` suffix);
/// values are their string encodings.
pub fn with_writer_config_defaults<I, K, V>(mut self, defaults: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: Into<String>,
V: Into<String>,
{
let m: HashMap<String, String> = defaults
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect();
match &mut self {
Self::Bucket {
writer_config_defaults,
..
}
| Self::Identity {
writer_config_defaults,
..
}
| Self::Unsharded {
writer_config_defaults,
..
} => *writer_config_defaults = m,
}
self
}
/// Borrow the list of index names this spec asks MemWAL to maintain.
pub fn maintained_indexes(&self) -> &[String] {
match self {
Self::Bucket {
maintained_indexes, ..
}
| Self::Identity {
maintained_indexes, ..
}
| Self::Unsharded {
maintained_indexes, ..
} => maintained_indexes,
}
}
/// Borrow the default `ShardWriter` configuration recorded by this spec.
pub fn writer_config_defaults(&self) -> &HashMap<String, String> {
match self {
Self::Bucket {
writer_config_defaults,
..
}
| Self::Identity {
writer_config_defaults,
..
}
| Self::Unsharded {
writer_config_defaults,
..
} => writer_config_defaults,
}
}
}
/// A trait for anything "table-like". This is used for both native tables (which target
/// Lance datasets) and remote tables (which target LanceDB cloud)
///
@@ -345,6 +516,43 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<MergeResult>;
/// Set the unenforced primary key for the table to a single column.
///
/// "Unenforced" means LanceDB does not check uniqueness on writes; the
/// column is recorded in the schema as the primary key for use by
/// features such as `merge_insert`. Only single-column primary keys are
/// supported, and the key cannot be changed once set.
///
/// The default implementation returns `NotSupported`; table types
/// backed by a Lance dataset override it.
async fn set_unenforced_primary_key(&self, _columns: &[&str]) -> Result<()> {
Err(Error::NotSupported {
message: "set_unenforced_primary_key is not supported on this table type".into(),
})
}
/// Install an [`LsmWriteSpec`] on this table.
///
/// The spec selects Lance's MemWAL LSM-style write path for future
/// `merge_insert` calls.
///
/// The default implementation returns `NotSupported`. Implementations
/// that support the MemWAL LSM write path must override this.
async fn set_lsm_write_spec(&self, _spec: LsmWriteSpec) -> Result<()> {
Err(Error::NotSupported {
message: "set_lsm_write_spec is not supported on this table type".into(),
})
}
/// Remove the [`LsmWriteSpec`] from this table.
///
/// This is a no-op if no spec is currently set.
///
/// The default implementation returns `NotSupported`. Implementations
/// that support the MemWAL LSM write path must override this.
async fn unset_lsm_write_spec(&self) -> Result<()> {
Err(Error::NotSupported {
message: "unset_lsm_write_spec is not supported on this table type".into(),
})
}
/// Gets the table tag manager.
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
/// Optimize the dataset.
@@ -1063,6 +1271,68 @@ impl Table {
self.inner.drop_columns(columns).await
}
/// Set the unenforced primary key for this table to a single column.
///
/// "Unenforced" means LanceDB does not check uniqueness on writes; the
/// column is recorded in the schema as the primary key so that features
/// such as `merge_insert` can use it.
///
/// Only single-column primary keys are supported, and the key cannot be
/// changed once set — calling this on a table that already has an
/// unenforced primary key fails. `columns` is an iterable for binding
/// ergonomics but must yield exactly one column:
///
/// - `table.set_unenforced_primary_key(["id"])`
pub async fn set_unenforced_primary_key<I, S>(&self, columns: I) -> Result<()>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let owned: Vec<String> = columns.into_iter().map(Into::into).collect();
let borrowed: Vec<&str> = owned.iter().map(String::as_str).collect();
self.inner.set_unenforced_primary_key(&borrowed).await
}
/// Install an [`LsmWriteSpec`] on this table, selecting Lance's MemWAL
/// LSM-style write path for future `merge_insert` calls.
///
/// [`LsmWriteSpec`] chooses one of three sharding strategies:
///
/// - [`LsmWriteSpec::bucket`] — hash-bucket writes by the single-column
/// unenforced primary key.
/// - [`LsmWriteSpec::identity`] — shard by the raw value of a scalar column.
/// - [`LsmWriteSpec::unsharded`] — route every write to a single shard.
///
/// All variants require the table to have an unenforced primary key
/// ([`Table::set_unenforced_primary_key`]); bucket sharding additionally
/// requires it to be the single column being bucketed.
///
/// # Example
///
/// ```
/// # use lancedb::table::{LsmWriteSpec, Table};
/// # async fn example(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
/// table.set_unenforced_primary_key(["id"]).await?;
/// table
/// .set_lsm_write_spec(
/// LsmWriteSpec::bucket("id", 16).with_maintained_indexes(["id_idx"]),
/// )
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> Result<()> {
self.inner.set_lsm_write_spec(spec).await
}
/// Remove the [`LsmWriteSpec`] from this table, reverting to the standard
/// `merge_insert` write path.
///
/// Errors if no spec is currently set.
pub async fn unset_lsm_write_spec(&self) -> Result<()> {
self.inner.unset_lsm_write_spec().await
}
/// Retrieve the version of the table
///
/// LanceDb supports versioning. Every operation that modifies the table increases
@@ -2469,6 +2739,18 @@ impl BaseTable for NativeTable {
merge::execute_merge_insert(self, params, new_data).await
}
async fn set_unenforced_primary_key(&self, columns: &[&str]) -> Result<()> {
primary_key::set_unenforced_primary_key(self, columns).await
}
async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> Result<()> {
merge::lsm::set_lsm_write_spec(self, spec).await
}
async fn unset_lsm_write_spec(&self) -> Result<()> {
merge::lsm::unset_lsm_write_spec(self).await
}
/// Delete rows from the table
async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
// Delegate to the submodule implementation
@@ -2770,6 +3052,7 @@ mod tests {
use futures::TryStreamExt;
use lance::Dataset;
use lance::io::{ObjectStoreParams, WrappingObjectStore};
use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
use tempfile::tempdir;
use super::*;
@@ -3864,6 +4147,395 @@ mod tests {
);
}
#[tokio::test]
async fn test_set_unenforced_primary_key() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
Field::new("score", DataType::Float64, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
Arc::new(arrow_array::Float64Array::from(vec![1.0, 2.0, 3.0])),
],
)
.unwrap();
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
let table = conn.create_table("t", reader).execute().await.unwrap();
// Reject empty input.
let err = table
.set_unenforced_primary_key(Vec::<&str>::new())
.await
.expect_err("empty input should be rejected");
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
// Reject compound (multi-column) input.
let err = table
.set_unenforced_primary_key(["id", "name"])
.await
.expect_err("compound primary key should be rejected");
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
// Reject unknown column.
let err = table
.set_unenforced_primary_key(["nonexistent"])
.await
.expect_err("nonexistent column should be rejected");
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
// Reject unsupported dtype (Float64).
let err = table
.set_unenforced_primary_key(["score"])
.await
.expect_err("Float64 should be rejected");
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
// None of the rejected calls set a primary key.
let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
assert!(lance_schema.unenforced_primary_key().is_empty());
// Happy path: set the primary key to "id".
table.set_unenforced_primary_key(["id"]).await.unwrap();
let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
let pk = lance_schema.unenforced_primary_key();
assert_eq!(pk.len(), 1);
assert_eq!(pk[0].name, "id");
// Position metadata is 1-indexed.
assert_eq!(
pk[0].metadata.get(LANCE_UNENFORCED_PRIMARY_KEY_POSITION),
Some(&"1".to_string())
);
// The primary key is immutable: re-setting it is rejected, whether to
// the same column or a different one.
let err = table
.set_unenforced_primary_key(["id"])
.await
.expect_err("re-setting the same primary key should be rejected");
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
let err = table
.set_unenforced_primary_key(["name"])
.await
.expect_err("changing the primary key should be rejected");
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
// The primary key is unchanged after the rejected calls.
let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
let pk = lance_schema.unenforced_primary_key();
assert_eq!(pk.len(), 1);
assert_eq!(pk[0].name, "id");
}
#[tokio::test]
async fn test_set_unenforced_primary_key_concurrent() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3]))],
)
.unwrap();
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
// A long read-consistency interval keeps each handle pinned to the
// version it opened, so the second handle commits against a stale
// base — the same situation as two processes racing.
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(3600))
.execute()
.await
.unwrap();
conn.create_table("t", reader).execute().await.unwrap();
let table_a = conn.open_table("t").execute().await.unwrap();
let table_b = conn.open_table("t").execute().await.unwrap();
// Handle A sets the primary key first.
table_a.set_unenforced_primary_key(["id"]).await.unwrap();
// Handle B committed against a stale base that had no primary key, so
// its own up-front check did not see A's key. The commit itself must
// still fail rather than silently overriding A's primary key. (The
// cross-process race on a *different* column is caught by the Lance
// commit layer.)
let err = table_b
.set_unenforced_primary_key(["id"])
.await
.expect_err("concurrent primary key commit on a stale base should fail");
assert!(
!matches!(err, Error::InvalidInput { .. }),
"expected a commit-time conflict, not an up-front input error: {:?}",
err
);
// The committed primary key is exactly what A set — no corruption.
let fresh = conn.open_table("t").execute().await.unwrap();
let lance_schema = fresh.as_native().unwrap().manifest().await.unwrap().schema;
let pk = lance_schema.unenforced_primary_key();
assert_eq!(pk.len(), 1);
assert_eq!(pk[0].name, "id");
}
#[tokio::test]
async fn test_set_lsm_write_spec() {
use arrow_array::StringArray;
use lance::dataset::mem_wal::DatasetMemWalExt;
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
],
)
.unwrap();
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
let table = conn.create_table("t", reader).execute().await.unwrap();
// Reject when no PK is set.
let err = table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
.await
.expect_err("should reject without PK");
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
// Set PK, then a mismatched column on the spec must be rejected.
table.set_unenforced_primary_key(["id"]).await.unwrap();
let err = table
.set_lsm_write_spec(LsmWriteSpec::bucket("name", 4))
.await
.expect_err("should reject column != PK");
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
// Reject num_buckets out of range.
for bad in [0u32, 1025] {
let err = table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", bad))
.await
.expect_err("should reject");
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
}
// Happy path: install spec; verify MemWAL details record it.
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
.await
.unwrap();
let native_tbl = table.as_native().unwrap();
let dataset = native_tbl.dataset.get().await.unwrap();
let details = dataset
.mem_wal_index_details()
.await
.unwrap()
.expect("MemWAL index should be initialized");
assert_eq!(details.num_shards, 4);
assert_eq!(details.sharding_specs.len(), 1);
let installed = &details.sharding_specs[0];
assert_eq!(installed.fields.len(), 1);
let f = &installed.fields[0];
assert_eq!(f.transform.as_deref(), Some("bucket"));
assert_eq!(
f.parameters.get("num_buckets").map(String::as_str),
Some("4")
);
// Bucket parameters must hold only `num_buckets`.
assert_eq!(f.parameters.len(), 1);
// Mutation rejected.
let err = table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 8))
.await
.expect_err("mutation should be rejected");
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
}
#[tokio::test]
async fn test_set_lsm_write_spec_unsharded() {
use lance::dataset::mem_wal::DatasetMemWalExt;
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(arrow_array::Int64Array::from(vec![1]))],
)
.unwrap();
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
let table = conn.create_table("t", reader).execute().await.unwrap();
// Lance's MemWAL still requires *some* unenforced primary key on
// the dataset; Unsharded just skips the per-row hashing step.
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::unsharded())
.await
.unwrap();
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
let details = dataset
.mem_wal_index_details()
.await
.unwrap()
.expect("MemWAL index should be initialized");
assert_eq!(details.num_shards, 1);
assert_eq!(details.sharding_specs.len(), 1);
let f = &details.sharding_specs[0].fields[0];
assert_eq!(f.transform.as_deref(), Some("unsharded"));
assert!(f.source_ids.is_empty());
}
#[tokio::test]
async fn test_set_lsm_write_spec_identity() {
use lance::dataset::mem_wal::DatasetMemWalExt;
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("region", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
],
)
.unwrap();
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
let table = conn.create_table("t", reader).execute().await.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(
LsmWriteSpec::identity("region")
.with_writer_config_defaults([("durable_write", "false")]),
)
.await
.unwrap();
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
let details = dataset
.mem_wal_index_details()
.await
.unwrap()
.expect("MemWAL index should be initialized");
// Identity sharding records an open-ended shard count.
assert_eq!(details.num_shards, 0);
assert_eq!(details.sharding_specs.len(), 1);
let f = &details.sharding_specs[0].fields[0];
assert_eq!(f.transform.as_deref(), Some("identity"));
// Writer config defaults round-trip into the MemWAL index.
assert_eq!(
details
.writer_config_defaults
.get("durable_write")
.map(String::as_str),
Some("false")
);
}
#[tokio::test]
async fn test_unset_lsm_write_spec() {
use lance::dataset::mem_wal::DatasetMemWalExt;
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(arrow_array::Int64Array::from(vec![1]))],
)
.unwrap();
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
let table = conn.create_table("t", reader).execute().await.unwrap();
// unset errors when no spec is set.
table.unset_lsm_write_spec().await.unwrap_err();
// Install a spec, then unset it.
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
.await
.unwrap();
{
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
assert!(dataset.mem_wal_index_details().await.unwrap().is_some());
}
table.unset_lsm_write_spec().await.unwrap();
{
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
assert!(dataset.mem_wal_index_details().await.unwrap().is_none());
}
// A second unset errors; a fresh spec can still be installed afterwards.
table.unset_lsm_write_spec().await.unwrap_err();
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 8))
.await
.unwrap();
{
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
assert!(dataset.mem_wal_index_details().await.unwrap().is_some());
}
}
#[tokio::test]
pub async fn test_stats() {
let tmp_dir = tempdir().unwrap();

View File

@@ -16,6 +16,8 @@ use crate::error::{Error, Result};
use super::{BaseTable, NativeTable};
pub(crate) mod lsm;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct MergeResult {
// The commit version associated with the operation.

View File

@@ -0,0 +1,101 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! MemWAL LSM write-path spec management.
//!
//! [`set_lsm_write_spec`] installs a [`super::super::LsmWriteSpec`] on a
//! table, which selects Lance's MemWAL LSM-style write path for future
//! `merge_insert` calls. [`unset_lsm_write_spec`] removes it. The actual
//! `merge_insert` dispatch and writer are a follow-up.
use lance::dataset::mem_wal::DatasetMemWalExt;
use lance::index::DatasetIndexExt;
use crate::error::{Error, Result};
use crate::table::{LsmWriteSpec, NativeTable};
// =============================================================================
// set_lsm_write_spec
// =============================================================================
/// Install an [`LsmWriteSpec`] on the table.
///
/// The bucket / unsharded sharding spec is constructed and validated by Lance's
/// [`InitializeMemWalBuilder`](lance::dataset::mem_wal::InitializeMemWalBuilder).
#[allow(clippy::redundant_pub_crate)]
pub(crate) async fn set_lsm_write_spec(table: &NativeTable, spec: LsmWriteSpec) -> Result<()> {
table.dataset.ensure_mutable()?;
{
let dataset = table.dataset.get().await?;
if dataset.mem_wal_index_details().await?.is_some() {
return Err(Error::InvalidInput {
message: "set_lsm_write_spec: an LSM write spec is already set on this table; mutation is not supported".into(),
});
}
}
let mut dataset = (*table.dataset.get().await?).clone();
let mut builder = dataset.initialize_mem_wal();
let (maintained_indexes, writer_config_defaults) = match spec {
LsmWriteSpec::Bucket {
column,
num_buckets,
maintained_indexes,
writer_config_defaults,
} => {
builder = builder.bucket_sharding(column, num_buckets);
(maintained_indexes, writer_config_defaults)
}
LsmWriteSpec::Identity {
column,
maintained_indexes,
writer_config_defaults,
} => {
builder = builder.identity_sharding(column);
(maintained_indexes, writer_config_defaults)
}
LsmWriteSpec::Unsharded {
maintained_indexes,
writer_config_defaults,
} => {
builder = builder.unsharded();
(maintained_indexes, writer_config_defaults)
}
};
builder = builder.maintained_indexes(maintained_indexes);
for (key, value) in writer_config_defaults {
builder = builder.add_writer_config_default(key, value);
}
builder.execute().await?;
table.dataset.update(dataset);
Ok(())
}
// =============================================================================
// unset_lsm_write_spec
// =============================================================================
/// Remove the [`LsmWriteSpec`] from the table by dropping the MemWAL index.
///
/// Errors if no spec is currently set.
#[allow(clippy::redundant_pub_crate)]
pub(crate) async fn unset_lsm_write_spec(table: &NativeTable) -> Result<()> {
table.dataset.ensure_mutable()?;
{
let dataset = table.dataset.get().await?;
if dataset.mem_wal_index_details().await?.is_none() {
return Err(Error::InvalidInput {
message: "unset_lsm_write_spec: no LSM write spec is set on this table".into(),
});
}
}
let mut dataset = (*table.dataset.get().await?).clone();
dataset
.drop_index(lance_index::mem_wal::MEM_WAL_INDEX_NAME)
.await?;
table.dataset.update(dataset);
Ok(())
}

View File

@@ -0,0 +1,113 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Table-level unenforced primary key support.
//!
//! [`set_unenforced_primary_key`] records a column as the table's primary key
//! by writing Lance schema field metadata. "Unenforced" means LanceDB does not
//! check uniqueness on write; the key is metadata for features such as
//! `merge_insert` to consume.
//!
//! Only a single-column primary key is supported, and the key cannot be
//! changed once set.
use arrow_schema::DataType;
use lance_core::datatypes::{LANCE_UNENFORCED_PRIMARY_KEY, LANCE_UNENFORCED_PRIMARY_KEY_POSITION};
use crate::error::{Error, Result};
use crate::table::NativeTable;
/// Set the unenforced primary key on `table` to the single column in `columns`.
///
/// Fails if `columns` is not exactly one column (compound primary keys are not
/// supported), if the column does not exist or has an unsupported dtype, or if
/// the table already has an unenforced primary key (changing the primary key
/// is not supported).
pub(super) async fn set_unenforced_primary_key(
table: &NativeTable,
columns: &[&str],
) -> Result<()> {
table.dataset.ensure_mutable()?;
if columns.is_empty() {
return Err(Error::InvalidInput {
message: "set_unenforced_primary_key: a column is required".into(),
});
}
if columns.len() > 1 {
return Err(Error::InvalidInput {
message: format!(
"set_unenforced_primary_key: compound primary keys are not supported, got {} columns",
columns.len()
),
});
}
let column = columns[0];
let updates = {
let dataset = table.dataset.get().await?;
let schema = dataset.schema();
// The primary key is immutable once set. The Lance commit layer is the
// source of truth for this (it also covers the concurrent-writer race);
// this check just fails fast with a clear message.
if !schema.unenforced_primary_key().is_empty() {
return Err(Error::InvalidInput {
message: "set_unenforced_primary_key: an unenforced primary key is already set on this table; changing it is not supported".into(),
});
}
let field = schema.field(column).ok_or_else(|| Error::InvalidInput {
message: format!(
"set_unenforced_primary_key: column '{}' not found on table",
column
),
})?;
if !is_supported_pk_dtype(&field.data_type()) {
return Err(Error::InvalidInput {
message: format!(
"set_unenforced_primary_key: column '{}' has dtype {:?} which is not supported as a primary key. Supported: Int32, Int64, Utf8, LargeUtf8, Binary, LargeBinary, FixedSizeBinary",
column,
field.data_type()
),
});
}
// Position metadata is 1-indexed; `Schema::unenforced_primary_key`
// treats position 0 as a legacy "no specific position" fallback.
let mut metadata = field.metadata.clone();
metadata.remove(LANCE_UNENFORCED_PRIMARY_KEY);
metadata.insert(
LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
"1".to_string(),
);
vec![(field_id_to_u32(field.id, &field.name)?, metadata)]
};
let mut dataset = (*table.dataset.get().await?).clone();
dataset.replace_field_metadata(updates).await?;
table.dataset.update(dataset);
Ok(())
}
fn field_id_to_u32(id: i32, name: &str) -> Result<u32> {
u32::try_from(id).map_err(|_| Error::Runtime {
message: format!(
"internal: field '{}' has unexpected negative field id {}",
name, id
),
})
}
fn is_supported_pk_dtype(dtype: &DataType) -> bool {
matches!(
dtype,
DataType::Int32
| DataType::Int64
| DataType::Utf8
| DataType::LargeUtf8
| DataType::Binary
| DataType::LargeBinary
| DataType::FixedSizeBinary(_)
)
}

View File

@@ -242,6 +242,10 @@ pub async fn create_plan(
scanner.disable_scoring_autoprojection();
}
if let Some(order_by) = &query.base.order_by {
scanner.order_by(Some(order_by.clone()))?;
}
Ok(scanner.create_plan().await?)
}