mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-23 15:00:39 +00:00
Compare commits
11 Commits
codex/upda
...
jack/idp-o
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c42848ae78 | ||
|
|
53c2164b84 | ||
|
|
6286ee8192 | ||
|
|
af8ca2ad5e | ||
|
|
aac6c62459 | ||
|
|
8df2fff75f | ||
|
|
0d30b31998 | ||
|
|
6a431ff0a0 | ||
|
|
ab2c5adf5e | ||
|
|
f02c4cad90 | ||
|
|
7b74c3dd91 |
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.28.0-beta.11"
|
||||
current_version = "0.29.1-beta.0"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
582
Cargo.lock
generated
582
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
28
Cargo.toml
28
Cargo.toml
@@ -13,20 +13,20 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "58.0.0", optional = false }
|
||||
|
||||
@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
|
||||
<dependency>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-core</artifactId>
|
||||
<version>0.28.0-beta.11</version>
|
||||
<version>0.29.1-beta.0</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
||||
@@ -343,6 +343,30 @@ This is useful for pagination.
|
||||
|
||||
***
|
||||
|
||||
### orderBy()
|
||||
|
||||
```ts
|
||||
orderBy(ordering): this
|
||||
```
|
||||
|
||||
Sort the results by the specified column(s).
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **ordering**: [`ColumnOrdering`](../interfaces/ColumnOrdering.md) \| [`ColumnOrdering`](../interfaces/ColumnOrdering.md)[]
|
||||
|
||||
#### Returns
|
||||
|
||||
`this`
|
||||
|
||||
This query builder.
|
||||
|
||||
#### Inherited from
|
||||
|
||||
`StandardQueryBase.orderBy`
|
||||
|
||||
***
|
||||
|
||||
### outputSchema()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -690,6 +690,74 @@ of the given query
|
||||
|
||||
***
|
||||
|
||||
### setLsmWriteSpec()
|
||||
|
||||
```ts
|
||||
abstract setLsmWriteSpec(spec): Promise<void>
|
||||
```
|
||||
|
||||
Install an [LsmWriteSpec](../interfaces/LsmWriteSpec.md) on this table, selecting Lance's MemWAL
|
||||
LSM-style write path for future `mergeInsert` calls.
|
||||
|
||||
`LsmWriteSpec` chooses one of three sharding strategies via `specType`:
|
||||
|
||||
- `"bucket"` — hash-bucket writes by the single-column unenforced primary
|
||||
key (`column` and `numBuckets` required).
|
||||
- `"identity"` — shard by the raw value of a scalar `column`.
|
||||
- `"unsharded"` — route every write to a single shard.
|
||||
|
||||
All variants require the table to have an unenforced primary key
|
||||
([Table#setUnenforcedPrimaryKey](Table.md#setunenforcedprimarykey)); bucket sharding additionally
|
||||
requires it to be the single column being bucketed.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **spec**: [`LsmWriteSpec`](../interfaces/LsmWriteSpec.md)
|
||||
The sharding spec to install.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
#### Example
|
||||
|
||||
```ts
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 16,
|
||||
maintainedIndexes: ["id_idx"],
|
||||
});
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### setUnenforcedPrimaryKey()
|
||||
|
||||
```ts
|
||||
abstract setUnenforcedPrimaryKey(columns): Promise<void>
|
||||
```
|
||||
|
||||
Set the unenforced primary key for this table to a single column.
|
||||
|
||||
"Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
column is recorded in the schema as the primary key for use by features
|
||||
such as `merge_insert`. Only single-column primary keys are supported,
|
||||
and the key cannot be changed once set.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **columns**: `string` \| `string`[]
|
||||
The primary key column. A one-element
|
||||
array is also accepted; passing more than one column is rejected.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
***
|
||||
|
||||
### stats()
|
||||
|
||||
```ts
|
||||
@@ -793,6 +861,23 @@ Return the table as an arrow table
|
||||
|
||||
***
|
||||
|
||||
### unsetLsmWriteSpec()
|
||||
|
||||
```ts
|
||||
abstract unsetLsmWriteSpec(): Promise<void>
|
||||
```
|
||||
|
||||
Remove the [LsmWriteSpec](../interfaces/LsmWriteSpec.md) from this table, reverting to the standard
|
||||
`mergeInsert` write path.
|
||||
|
||||
Errors if no spec is currently set.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
***
|
||||
|
||||
### update()
|
||||
|
||||
#### update(opts)
|
||||
|
||||
@@ -498,6 +498,30 @@ This is useful for pagination.
|
||||
|
||||
***
|
||||
|
||||
### orderBy()
|
||||
|
||||
```ts
|
||||
orderBy(ordering): this
|
||||
```
|
||||
|
||||
Sort the results by the specified column(s).
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **ordering**: [`ColumnOrdering`](../interfaces/ColumnOrdering.md) \| [`ColumnOrdering`](../interfaces/ColumnOrdering.md)[]
|
||||
|
||||
#### Returns
|
||||
|
||||
`this`
|
||||
|
||||
This query builder.
|
||||
|
||||
#### Inherited from
|
||||
|
||||
`StandardQueryBase.orderBy`
|
||||
|
||||
***
|
||||
|
||||
### outputSchema()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
## Enumerations
|
||||
|
||||
- [FullTextQueryType](enumerations/FullTextQueryType.md)
|
||||
- [OAuthFlowType](enumerations/OAuthFlowType.md)
|
||||
- [Occur](enumerations/Occur.md)
|
||||
- [Operator](enumerations/Operator.md)
|
||||
|
||||
@@ -51,6 +52,7 @@
|
||||
- [AlterColumnsResult](interfaces/AlterColumnsResult.md)
|
||||
- [ClientConfig](interfaces/ClientConfig.md)
|
||||
- [ColumnAlteration](interfaces/ColumnAlteration.md)
|
||||
- [ColumnOrdering](interfaces/ColumnOrdering.md)
|
||||
- [CompactionStats](interfaces/CompactionStats.md)
|
||||
- [ConnectNamespaceOptions](interfaces/ConnectNamespaceOptions.md)
|
||||
- [ConnectionOptions](interfaces/ConnectionOptions.md)
|
||||
@@ -79,7 +81,10 @@
|
||||
- [IvfRqOptions](interfaces/IvfRqOptions.md)
|
||||
- [ListNamespacesOptions](interfaces/ListNamespacesOptions.md)
|
||||
- [ListNamespacesResponse](interfaces/ListNamespacesResponse.md)
|
||||
- [LsmWriteSpec](interfaces/LsmWriteSpec.md)
|
||||
- [MergeResult](interfaces/MergeResult.md)
|
||||
- [NativeOAuthConfig](interfaces/NativeOAuthConfig.md)
|
||||
- [OAuthConfig](interfaces/OAuthConfig.md)
|
||||
- [OpenTableOptions](interfaces/OpenTableOptions.md)
|
||||
- [OptimizeOptions](interfaces/OptimizeOptions.md)
|
||||
- [OptimizeStats](interfaces/OptimizeStats.md)
|
||||
|
||||
31
docs/src/js/interfaces/ColumnOrdering.md
Normal file
31
docs/src/js/interfaces/ColumnOrdering.md
Normal file
@@ -0,0 +1,31 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / ColumnOrdering
|
||||
|
||||
# Interface: ColumnOrdering
|
||||
|
||||
## Properties
|
||||
|
||||
### ascending?
|
||||
|
||||
```ts
|
||||
optional ascending: boolean;
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### columnName
|
||||
|
||||
```ts
|
||||
columnName: string;
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### nullsFirst?
|
||||
|
||||
```ts
|
||||
optional nullsFirst: boolean;
|
||||
```
|
||||
@@ -64,6 +64,18 @@ client used by manifest-enabled native connections.
|
||||
|
||||
***
|
||||
|
||||
### oauthConfig?
|
||||
|
||||
```ts
|
||||
optional oauthConfig: NativeOAuthConfig;
|
||||
```
|
||||
|
||||
(For LanceDB cloud only): OAuth configuration for IdP-based
|
||||
authentication (e.g., Azure Entra ID). When set, token acquisition
|
||||
and refresh are handled entirely in Rust.
|
||||
|
||||
***
|
||||
|
||||
### readConsistencyInterval?
|
||||
|
||||
```ts
|
||||
|
||||
64
docs/src/js/interfaces/LsmWriteSpec.md
Normal file
64
docs/src/js/interfaces/LsmWriteSpec.md
Normal file
@@ -0,0 +1,64 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / LsmWriteSpec
|
||||
|
||||
# Interface: LsmWriteSpec
|
||||
|
||||
Specification selecting Lance's MemWAL LSM-style write path for
|
||||
`mergeInsert`.
|
||||
|
||||
`specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
|
||||
`column` and `numBuckets` are required; for `"identity"`, `column` is
|
||||
required.
|
||||
|
||||
## Properties
|
||||
|
||||
### column?
|
||||
|
||||
```ts
|
||||
optional column: string;
|
||||
```
|
||||
|
||||
Bucket and identity variants: the sharding column.
|
||||
|
||||
***
|
||||
|
||||
### maintainedIndexes?
|
||||
|
||||
```ts
|
||||
optional maintainedIndexes: string[];
|
||||
```
|
||||
|
||||
Names of indexes the MemWAL should keep up to date during writes.
|
||||
|
||||
***
|
||||
|
||||
### numBuckets?
|
||||
|
||||
```ts
|
||||
optional numBuckets: number;
|
||||
```
|
||||
|
||||
Bucket variant: the number of buckets, in `[1, 1024]`.
|
||||
|
||||
***
|
||||
|
||||
### specType
|
||||
|
||||
```ts
|
||||
specType: "bucket" | "identity" | "unsharded";
|
||||
```
|
||||
|
||||
One of `"bucket"`, `"identity"`, or `"unsharded"`.
|
||||
|
||||
***
|
||||
|
||||
### writerConfigDefaults?
|
||||
|
||||
```ts
|
||||
optional writerConfigDefaults: Record<string, string>;
|
||||
```
|
||||
|
||||
Default `ShardWriter` configuration recorded in the MemWAL index.
|
||||
@@ -8,7 +8,7 @@
|
||||
<parent>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.28.0-beta.11</version>
|
||||
<version>0.29.1-beta.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.28.0-beta.11</version>
|
||||
<version>0.29.1-beta.0</version>
|
||||
<packaging>pom</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
<description>LanceDB Java SDK Parent POM</description>
|
||||
@@ -28,7 +28,7 @@
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<arrow.version>15.0.0</arrow.version>
|
||||
<lance-core.version>7.0.0-beta.7</lance-core.version>
|
||||
<lance-core.version>7.0.0-beta.13</lance-core.version>
|
||||
<spotless.skip>false</spotless.skip>
|
||||
<spotless.version>2.30.0</spotless.version>
|
||||
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lancedb-nodejs"
|
||||
edition.workspace = true
|
||||
version = "0.28.0-beta.11"
|
||||
version = "0.29.1-beta.0"
|
||||
publish = false
|
||||
license.workspace = true
|
||||
description.workspace = true
|
||||
|
||||
@@ -109,3 +109,209 @@ describe("Query outputSchema", () => {
|
||||
expect(schema.fields.length).toBe(3);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Query orderBy", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
let table: Table;
|
||||
|
||||
beforeEach(async () => {
|
||||
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||
const db = await connect(tmpDir.name);
|
||||
|
||||
// Create table with numeric data for sorting
|
||||
const schema = new Schema([
|
||||
new Field("id", new Int64(), true),
|
||||
new Field("score", new Float32(), true),
|
||||
new Field("name", new Utf8(), true),
|
||||
]);
|
||||
|
||||
const data = makeArrowTable(
|
||||
[
|
||||
{ id: 1n, score: 3.5, name: "charlie" },
|
||||
{ id: 2n, score: 1.2, name: "alice" },
|
||||
{ id: 3n, score: 2.8, name: "bob" },
|
||||
{ id: 4n, score: 0.5, name: "david" },
|
||||
{ id: 5n, score: 4.1, name: "eve" },
|
||||
],
|
||||
{ schema },
|
||||
);
|
||||
table = await db.createTable("test", data);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
tmpDir.removeCallback();
|
||||
});
|
||||
|
||||
it("should sort by single column ascending", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "score", ascending: true, nullsFirst: false })
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(5);
|
||||
// Verify ascending order
|
||||
expect(results[0].score).toBeCloseTo(0.5, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(1.2, 0.001);
|
||||
expect(results[2].score).toBeCloseTo(2.8, 0.001);
|
||||
expect(results[3].score).toBeCloseTo(3.5, 0.001);
|
||||
expect(results[4].score).toBeCloseTo(4.1, 0.001);
|
||||
});
|
||||
|
||||
it("should sort by single column descending", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "score", ascending: false, nullsFirst: false })
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(5);
|
||||
// Verify descending order
|
||||
expect(results[0].score).toBeCloseTo(4.1, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(3.5, 0.001);
|
||||
expect(results[2].score).toBeCloseTo(2.8, 0.001);
|
||||
expect(results[3].score).toBeCloseTo(1.2, 0.001);
|
||||
expect(results[4].score).toBeCloseTo(0.5, 0.001);
|
||||
});
|
||||
|
||||
it("should use ascending as default direction", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "score" })
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(5);
|
||||
// Verify ascending order (default)
|
||||
expect(results[0].score).toBeCloseTo(0.5, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(1.2, 0.001);
|
||||
expect(results[2].score).toBeCloseTo(2.8, 0.001);
|
||||
expect(results[3].score).toBeCloseTo(3.5, 0.001);
|
||||
expect(results[4].score).toBeCloseTo(4.1, 0.001);
|
||||
});
|
||||
|
||||
it("should sort by string column", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "name" })
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(5);
|
||||
// Verify alphabetical order
|
||||
expect(results[0].name).toBe("alice");
|
||||
expect(results[1].name).toBe("bob");
|
||||
expect(results[2].name).toBe("charlie");
|
||||
expect(results[3].name).toBe("david");
|
||||
expect(results[4].name).toBe("eve");
|
||||
});
|
||||
|
||||
it("should support method chaining with where", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.where("score > 2.0")
|
||||
.orderBy({ columnName: "score" })
|
||||
.toArray();
|
||||
expect(results.length).toBe(3);
|
||||
// Verify filtered and sorted
|
||||
expect(results[0].score).toBeCloseTo(2.8, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(3.5, 0.001);
|
||||
expect(results[2].score).toBeCloseTo(4.1, 0.001);
|
||||
});
|
||||
|
||||
it("should support method chaining with limit", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "score", ascending: false })
|
||||
.limit(3)
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(3);
|
||||
// Verify top 3 in descending order
|
||||
expect(results[0].score).toBeCloseTo(4.1, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(3.5, 0.001);
|
||||
expect(results[2].score).toBeCloseTo(2.8, 0.001);
|
||||
});
|
||||
|
||||
it("should support method chaining with offset", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "score" })
|
||||
.offset(2)
|
||||
.limit(2)
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(2);
|
||||
// Verify results skip first 2 and take next 2
|
||||
expect(results[0].score).toBeCloseTo(2.8, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(3.5, 0.001);
|
||||
});
|
||||
|
||||
it("should support method chaining with select", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "name" })
|
||||
.select(["name", "score"])
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(5);
|
||||
// Verify only selected columns are present
|
||||
expect(Object.keys(results[0])).toEqual(["name", "score"]);
|
||||
expect(Object.keys(results[4])).toEqual(["name", "score"]);
|
||||
// Verify sorted by name
|
||||
expect(results[0].name).toBe("alice");
|
||||
expect(results[4].name).toBe("eve");
|
||||
});
|
||||
|
||||
it("should support complex method chaining", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.where("score > 1.0")
|
||||
.orderBy({ columnName: "score", ascending: false })
|
||||
.limit(3)
|
||||
.select(["id", "score", "name"])
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(3);
|
||||
// Verify filtered, sorted, limited, and projected
|
||||
expect(results[0].score).toBeCloseTo(4.1, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(3.5, 0.001);
|
||||
expect(results[2].score).toBeCloseTo(2.8, 0.001);
|
||||
expect(Object.keys(results[0])).toEqual(["id", "score", "name"]);
|
||||
});
|
||||
|
||||
it("should support multi-column ordering and null placement", async () => {
|
||||
const schema = new Schema([
|
||||
new Field("group", new Int64(), true),
|
||||
new Field("score", new Float32(), true),
|
||||
new Field("name", new Utf8(), true),
|
||||
]);
|
||||
|
||||
const data = makeArrowTable(
|
||||
[
|
||||
{ group: 1n, score: null, name: "z" },
|
||||
{ group: 1n, score: 1.0, name: "b" },
|
||||
{ group: 1n, score: 1.0, name: "a" },
|
||||
{ group: 2n, score: 0.5, name: "c" },
|
||||
],
|
||||
{ schema },
|
||||
);
|
||||
const nullTable = await (await connect(tmpDir.name)).createTable(
|
||||
"test_multi_order",
|
||||
data,
|
||||
{ mode: "overwrite" },
|
||||
);
|
||||
|
||||
const results = await nullTable
|
||||
.query()
|
||||
.orderBy([
|
||||
{ columnName: "group", ascending: true, nullsFirst: false },
|
||||
{ columnName: "score", ascending: true, nullsFirst: true },
|
||||
{ columnName: "name", ascending: true, nullsFirst: false },
|
||||
])
|
||||
.toArray();
|
||||
|
||||
expect(results.map((r) => [r.group, r.score, r.name])).toEqual([
|
||||
[1n, null, "z"],
|
||||
[1n, 1.0, "a"],
|
||||
[1n, 1.0, "b"],
|
||||
[2n, 0.5, "c"],
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -2348,3 +2348,130 @@ describe("when creating a table with Float32Array vectors", () => {
|
||||
expect((fsl.children[0].type as Float32).precision).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("setUnenforcedPrimaryKey", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||
});
|
||||
afterEach(() => tmpDir.removeCallback());
|
||||
|
||||
it("sets a single-column primary key (string or one-element array)", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const schema = new arrow.Schema([
|
||||
new arrow.Field("id", new arrow.Int64(), false),
|
||||
]);
|
||||
const t1 = await conn.createEmptyTable("t1", schema);
|
||||
await t1.setUnenforcedPrimaryKey("id");
|
||||
|
||||
const t2 = await conn.createEmptyTable("t2", schema);
|
||||
await t2.setUnenforcedPrimaryKey(["id"]);
|
||||
});
|
||||
|
||||
it("rejects a compound primary key", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await conn.createEmptyTable(
|
||||
"t",
|
||||
new arrow.Schema([
|
||||
new arrow.Field("id", new arrow.Int64(), false),
|
||||
new arrow.Field("name", new arrow.Utf8(), false),
|
||||
]),
|
||||
);
|
||||
await expect(
|
||||
table.setUnenforcedPrimaryKey(["id", "name"]),
|
||||
).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("rejects changing the primary key once set", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await conn.createEmptyTable(
|
||||
"t",
|
||||
new arrow.Schema([
|
||||
new arrow.Field("id", new arrow.Int64(), false),
|
||||
new arrow.Field("name", new arrow.Utf8(), false),
|
||||
]),
|
||||
);
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await expect(table.setUnenforcedPrimaryKey("name")).rejects.toThrow();
|
||||
await expect(table.setUnenforcedPrimaryKey("id")).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
describe("setLsmWriteSpec / unsetLsmWriteSpec", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||
});
|
||||
afterEach(() => tmpDir.removeCallback());
|
||||
|
||||
async function makeTable(conn: Connection): Promise<Table> {
|
||||
return await conn.createEmptyTable(
|
||||
"t",
|
||||
new arrow.Schema([new arrow.Field("id", new arrow.Int64(), false)]),
|
||||
);
|
||||
}
|
||||
|
||||
it("installs and removes a bucket spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await makeTable(conn);
|
||||
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 4,
|
||||
});
|
||||
await table.unsetLsmWriteSpec();
|
||||
// A second unset errors — there is no spec left to remove.
|
||||
await expect(table.unsetLsmWriteSpec()).rejects.toThrow();
|
||||
// A fresh spec can be installed after unset.
|
||||
await table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 8,
|
||||
});
|
||||
});
|
||||
|
||||
it("installs an unsharded spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await makeTable(conn);
|
||||
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await table.setLsmWriteSpec({ specType: "unsharded" });
|
||||
await table.unsetLsmWriteSpec();
|
||||
});
|
||||
|
||||
it("installs an identity spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await makeTable(conn);
|
||||
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await table.setLsmWriteSpec({ specType: "identity", column: "id" });
|
||||
await table.unsetLsmWriteSpec();
|
||||
});
|
||||
|
||||
it("rejects an invalid spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await makeTable(conn);
|
||||
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
// num_buckets out of range.
|
||||
await expect(
|
||||
table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 0,
|
||||
}),
|
||||
).rejects.toThrow();
|
||||
// Column mismatch.
|
||||
await expect(
|
||||
table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "missing",
|
||||
numBuckets: 4,
|
||||
}),
|
||||
).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -38,5 +38,14 @@ test("filtering examples", async () => {
|
||||
// --8<-- [start:sql_search]
|
||||
await tbl.query().where("id = 10").limit(10).toArray();
|
||||
// --8<-- [end:sql_search]
|
||||
|
||||
// --8<-- [start:orderby_search]
|
||||
await tbl
|
||||
.query()
|
||||
.where("id > 10")
|
||||
.orderBy({ columnName: "id", ascending: false })
|
||||
.limit(5)
|
||||
.toArray();
|
||||
// --8<-- [end:orderby_search]
|
||||
});
|
||||
});
|
||||
|
||||
@@ -50,6 +50,7 @@ export {
|
||||
SplitHashOptions,
|
||||
SplitSequentialOptions,
|
||||
ShuffleOptions,
|
||||
OAuthConfig as NativeOAuthConfig,
|
||||
} from "./native.js";
|
||||
|
||||
export {
|
||||
@@ -82,6 +83,7 @@ export {
|
||||
VectorQuery,
|
||||
TakeQuery,
|
||||
QueryExecutionOptions,
|
||||
ColumnOrdering,
|
||||
FullTextSearchOptions,
|
||||
RecordBatchIterator,
|
||||
FullTextQuery,
|
||||
@@ -112,6 +114,7 @@ export {
|
||||
UpdateOptions,
|
||||
OptimizeOptions,
|
||||
Version,
|
||||
LsmWriteSpec,
|
||||
ColumnAlteration,
|
||||
} from "./table";
|
||||
|
||||
@@ -122,6 +125,8 @@ export {
|
||||
TokenResponse,
|
||||
} from "./header";
|
||||
|
||||
export { OAuthConfig, OAuthFlowType } from "./oauth";
|
||||
|
||||
export { MergeInsertBuilder, WriteExecutionOptions } from "./merge";
|
||||
|
||||
export * as embedding from "./embedding";
|
||||
|
||||
82
nodejs/lancedb/oauth.ts
Normal file
82
nodejs/lancedb/oauth.ts
Normal file
@@ -0,0 +1,82 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
/**
|
||||
* OAuth authentication flow types.
|
||||
*/
|
||||
export enum OAuthFlowType {
|
||||
/** Client Credentials grant (service-to-service / M2M). */
|
||||
ClientCredentials = "client_credentials",
|
||||
/** Authorization Code with PKCE (interactive browser-based auth). */
|
||||
AuthorizationCodePKCE = "authorization_code_pkce",
|
||||
/** Device Code grant (CLI / headless environments). */
|
||||
DeviceCode = "device_code",
|
||||
/** Azure Managed Identity via IMDS. */
|
||||
AzureManagedIdentity = "azure_managed_identity",
|
||||
/** Workload Identity Federation (K8s, GitHub Actions). */
|
||||
WorkloadIdentity = "workload_identity",
|
||||
}
|
||||
|
||||
/**
|
||||
* OAuth configuration for LanceDB authentication.
|
||||
*
|
||||
* All token acquisition and refresh is handled in the Rust layer.
|
||||
* This config is passed through to Rust via napi-rs.
|
||||
*
|
||||
* @example Client Credentials (service-to-service):
|
||||
* ```typescript
|
||||
* const config: OAuthConfig = {
|
||||
* issuerUrl: "https://login.microsoftonline.com/{tenant}/v2.0",
|
||||
* clientId: "app-id",
|
||||
* clientSecret: "secret",
|
||||
* scopes: ["api://lancedb-api/.default"],
|
||||
* };
|
||||
* ```
|
||||
*
|
||||
* @example Azure Managed Identity:
|
||||
* ```typescript
|
||||
* const config: OAuthConfig = {
|
||||
* issuerUrl: "https://login.microsoftonline.com/{tenant}/v2.0",
|
||||
* clientId: "app-id",
|
||||
* scopes: ["api://lancedb-api/.default"],
|
||||
* flow: OAuthFlowType.AzureManagedIdentity,
|
||||
* };
|
||||
* ```
|
||||
*/
|
||||
export interface OAuthConfig {
|
||||
/**
|
||||
* OIDC issuer URL or OAuth authority URL.
|
||||
* For Azure: `https://login.microsoftonline.com/{tenant_id}/v2.0`
|
||||
*/
|
||||
issuerUrl: string;
|
||||
|
||||
/** Application / Client ID. */
|
||||
clientId: string;
|
||||
|
||||
/**
|
||||
* OAuth scopes to request.
|
||||
* For Azure: `["api://{app_id}/.default"]`
|
||||
*/
|
||||
scopes: string[];
|
||||
|
||||
/** Authentication flow (default: ClientCredentials). */
|
||||
flow?: OAuthFlowType;
|
||||
|
||||
/** Client secret (required for ClientCredentials). */
|
||||
clientSecret?: string;
|
||||
|
||||
/** Redirect URI (AuthorizationCodePKCE flow). */
|
||||
redirectUri?: string;
|
||||
|
||||
/** Port for local callback server (AuthorizationCodePKCE, default: 8400). */
|
||||
callbackPort?: number;
|
||||
|
||||
/** Client ID for user-assigned managed identity (AzureManagedIdentity). */
|
||||
managedIdentityClientId?: string;
|
||||
|
||||
/** Path to federated token file (WorkloadIdentity). */
|
||||
tokenFile?: string;
|
||||
|
||||
/** Seconds before expiry to trigger proactive refresh (default: 300). */
|
||||
refreshBufferSecs?: number;
|
||||
}
|
||||
@@ -79,6 +79,12 @@ export interface QueryExecutionOptions {
|
||||
timeoutMs?: number;
|
||||
}
|
||||
|
||||
export interface ColumnOrdering {
|
||||
columnName: string;
|
||||
ascending?: boolean;
|
||||
nullsFirst?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Options that control the behavior of a full text search
|
||||
*/
|
||||
@@ -417,6 +423,21 @@ export class StandardQueryBase<
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort the results by the specified column(s).
|
||||
* @returns This query builder.
|
||||
*/
|
||||
orderBy(ordering: ColumnOrdering | ColumnOrdering[]): this {
|
||||
const orderings = Array.isArray(ordering) ? ordering : [ordering];
|
||||
const normalized = orderings.map((o) => ({
|
||||
columnName: o.columnName,
|
||||
ascending: o.ascending ?? true,
|
||||
nullsFirst: o.nullsFirst ?? false,
|
||||
}));
|
||||
this.doCall((inner) => inner.orderBy(normalized));
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip searching un-indexed data. This can make search faster, but will miss
|
||||
* any data that is not yet indexed.
|
||||
|
||||
@@ -106,6 +106,27 @@ export interface Version {
|
||||
metadata: Record<string, string>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specification selecting Lance's MemWAL LSM-style write path for
|
||||
* `mergeInsert`.
|
||||
*
|
||||
* `specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
|
||||
* `column` and `numBuckets` are required; for `"identity"`, `column` is
|
||||
* required.
|
||||
*/
|
||||
export interface LsmWriteSpec {
|
||||
/** One of `"bucket"`, `"identity"`, or `"unsharded"`. */
|
||||
specType: "bucket" | "identity" | "unsharded";
|
||||
/** Bucket and identity variants: the sharding column. */
|
||||
column?: string;
|
||||
/** Bucket variant: the number of buckets, in `[1, 1024]`. */
|
||||
numBuckets?: number;
|
||||
/** Names of indexes the MemWAL should keep up to date during writes. */
|
||||
maintainedIndexes?: string[];
|
||||
/** Default `ShardWriter` configuration recorded in the MemWAL index. */
|
||||
writerConfigDefaults?: Record<string, string>;
|
||||
}
|
||||
|
||||
/**
|
||||
* A Table is a collection of Records in a LanceDB Database.
|
||||
*
|
||||
@@ -449,6 +470,54 @@ export abstract class Table {
|
||||
* containing the new version number of the table after dropping the columns.
|
||||
*/
|
||||
abstract dropColumns(columnNames: string[]): Promise<DropColumnsResult>;
|
||||
/**
|
||||
* Set the unenforced primary key for this table to a single column.
|
||||
*
|
||||
* "Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
* column is recorded in the schema as the primary key for use by features
|
||||
* such as `merge_insert`. Only single-column primary keys are supported,
|
||||
* and the key cannot be changed once set.
|
||||
* @param {string | string[]} columns The primary key column. A one-element
|
||||
* array is also accepted; passing more than one column is rejected.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
abstract setUnenforcedPrimaryKey(columns: string | string[]): Promise<void>;
|
||||
/**
|
||||
* Install an {@link LsmWriteSpec} on this table, selecting Lance's MemWAL
|
||||
* LSM-style write path for future `mergeInsert` calls.
|
||||
*
|
||||
* `LsmWriteSpec` chooses one of three sharding strategies via `specType`:
|
||||
*
|
||||
* - `"bucket"` — hash-bucket writes by the single-column unenforced primary
|
||||
* key (`column` and `numBuckets` required).
|
||||
* - `"identity"` — shard by the raw value of a scalar `column`.
|
||||
* - `"unsharded"` — route every write to a single shard.
|
||||
*
|
||||
* All variants require the table to have an unenforced primary key
|
||||
* ({@link Table#setUnenforcedPrimaryKey}); bucket sharding additionally
|
||||
* requires it to be the single column being bucketed.
|
||||
* @param {LsmWriteSpec} spec The sharding spec to install.
|
||||
* @returns {Promise<void>}
|
||||
* @example
|
||||
* ```ts
|
||||
* await table.setUnenforcedPrimaryKey("id");
|
||||
* await table.setLsmWriteSpec({
|
||||
* specType: "bucket",
|
||||
* column: "id",
|
||||
* numBuckets: 16,
|
||||
* maintainedIndexes: ["id_idx"],
|
||||
* });
|
||||
* ```
|
||||
*/
|
||||
abstract setLsmWriteSpec(spec: LsmWriteSpec): Promise<void>;
|
||||
/**
|
||||
* Remove the {@link LsmWriteSpec} from this table, reverting to the standard
|
||||
* `mergeInsert` write path.
|
||||
*
|
||||
* Errors if no spec is currently set.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
abstract unsetLsmWriteSpec(): Promise<void>;
|
||||
/** Retrieve the version of the table */
|
||||
|
||||
abstract version(): Promise<number>;
|
||||
@@ -897,6 +966,19 @@ export class LocalTable extends Table {
|
||||
return await this.inner.dropColumns(columnNames);
|
||||
}
|
||||
|
||||
async setUnenforcedPrimaryKey(columns: string | string[]): Promise<void> {
|
||||
const cols = typeof columns === "string" ? [columns] : columns;
|
||||
return await this.inner.setUnenforcedPrimaryKey(cols);
|
||||
}
|
||||
|
||||
async setLsmWriteSpec(spec: LsmWriteSpec): Promise<void> {
|
||||
return await this.inner.setLsmWriteSpec(spec);
|
||||
}
|
||||
|
||||
async unsetLsmWriteSpec(): Promise<void> {
|
||||
return await this.inner.unsetLsmWriteSpec();
|
||||
}
|
||||
|
||||
async version(): Promise<number> {
|
||||
return await this.inner.version();
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-arm64",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.29.1-beta.0",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.darwin-arm64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.29.1-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-musl",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.29.1-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-gnu",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.29.1-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-musl",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.29.1-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.29.1-beta.0",
|
||||
"os": [
|
||||
"win32"
|
||||
],
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-x64-msvc",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.29.1-beta.0",
|
||||
"os": ["win32"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.win32-x64-msvc.node",
|
||||
|
||||
11029
nodejs/package-lock.json
generated
Normal file
11029
nodejs/package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load Diff
@@ -11,7 +11,7 @@
|
||||
"ann"
|
||||
],
|
||||
"private": false,
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.29.1-beta.0",
|
||||
"main": "dist/index.js",
|
||||
"exports": {
|
||||
".": "./dist/index.js",
|
||||
|
||||
@@ -112,6 +112,11 @@ impl Connection {
|
||||
|
||||
builder = builder.client_config(rust_config);
|
||||
|
||||
if let Some(oauth_config) = options.oauth_config {
|
||||
let config: lancedb::remote::oauth::OAuthConfig = oauth_config.into();
|
||||
builder = builder.oauth_config(config);
|
||||
}
|
||||
|
||||
if let Some(api_key) = options.api_key {
|
||||
builder = builder.api_key(&api_key);
|
||||
}
|
||||
|
||||
@@ -61,6 +61,10 @@ pub struct ConnectionOptions {
|
||||
/// (For LanceDB cloud only): the host to use for LanceDB cloud. Used
|
||||
/// for testing purposes.
|
||||
pub host_override: Option<String>,
|
||||
/// (For LanceDB cloud only): OAuth configuration for IdP-based
|
||||
/// authentication (e.g., Azure Entra ID). When set, token acquisition
|
||||
/// and refresh are handled entirely in Rust.
|
||||
pub oauth_config: Option<remote::OAuthConfig>,
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
|
||||
@@ -3,6 +3,12 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::error::NapiErrorExt;
|
||||
use crate::error::convert_error;
|
||||
use crate::iterator::RecordBatchIterator;
|
||||
use crate::rerankers::RerankHybridCallbackArgs;
|
||||
use crate::rerankers::Reranker;
|
||||
use crate::util::{parse_distance_type, schema_to_buffer};
|
||||
use arrow_array::{
|
||||
Array, Float16Array as ArrowFloat16Array, Float32Array as ArrowFloat32Array,
|
||||
Float64Array as ArrowFloat64Array, UInt8Array as ArrowUInt8Array,
|
||||
@@ -19,16 +25,27 @@ use lancedb::query::QueryBase;
|
||||
use lancedb::query::QueryExecutionOptions;
|
||||
use lancedb::query::Select;
|
||||
use lancedb::query::TakeQuery as LanceDbTakeQuery;
|
||||
use lancedb::query::VectorQuery as LanceDbVectorQuery;
|
||||
use lancedb::query::{ColumnOrdering as LanceDbColumnOrdering, VectorQuery as LanceDbVectorQuery};
|
||||
use napi::bindgen_prelude::*;
|
||||
use napi_derive::napi;
|
||||
|
||||
use crate::error::NapiErrorExt;
|
||||
use crate::error::convert_error;
|
||||
use crate::iterator::RecordBatchIterator;
|
||||
use crate::rerankers::RerankHybridCallbackArgs;
|
||||
use crate::rerankers::Reranker;
|
||||
use crate::util::{parse_distance_type, schema_to_buffer};
|
||||
#[napi(object)]
|
||||
pub struct ColumnOrdering {
|
||||
pub ascending: bool,
|
||||
pub nulls_first: bool,
|
||||
pub column_name: String,
|
||||
}
|
||||
|
||||
impl From<ColumnOrdering> for LanceDbColumnOrdering {
|
||||
fn from(value: ColumnOrdering) -> Self {
|
||||
match (value.ascending, value.nulls_first) {
|
||||
(true, true) => Self::asc_nulls_first(value.column_name),
|
||||
(true, false) => Self::asc_nulls_last(value.column_name),
|
||||
(false, true) => Self::desc_nulls_first(value.column_name),
|
||||
(false, false) => Self::desc_nulls_last(value.column_name),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn bytes_to_arrow_array(data: Uint8Array, dtype: String) -> napi::Result<Arc<dyn Array>> {
|
||||
let buf = arrow_buffer::Buffer::from(data.to_vec());
|
||||
@@ -128,6 +145,18 @@ impl Query {
|
||||
self.inner = self.inner.clone().with_row_id();
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn order_by(&mut self, ordering: Option<Vec<ColumnOrdering>>) -> napi::Result<()> {
|
||||
let ordering = ordering.map(|ordering| {
|
||||
ordering
|
||||
.into_iter()
|
||||
.map(LanceDbColumnOrdering::from)
|
||||
.collect()
|
||||
});
|
||||
self.inner = self.inner.clone().order_by(ordering);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn output_schema(&self) -> napi::Result<Buffer> {
|
||||
let schema = self.inner.output_schema().await.default_error()?;
|
||||
@@ -328,6 +357,18 @@ impl VectorQuery {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn order_by(&mut self, ordering: Option<Vec<ColumnOrdering>>) -> napi::Result<()> {
|
||||
let ordering = ordering.map(|ordering| {
|
||||
ordering
|
||||
.into_iter()
|
||||
.map(LanceDbColumnOrdering::from)
|
||||
.collect()
|
||||
});
|
||||
self.inner = self.inner.clone().order_by(ordering);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn output_schema(&self) -> napi::Result<Buffer> {
|
||||
let schema = self.inner.output_schema().await.default_error()?;
|
||||
|
||||
@@ -140,6 +140,67 @@ impl From<TlsConfig> for lancedb::remote::TlsConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// OAuth configuration for LanceDB authentication.
|
||||
/// All token acquisition and refresh is handled in the Rust layer.
|
||||
#[napi(object)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OAuthConfig {
|
||||
/// OIDC issuer URL or OAuth authority URL.
|
||||
/// For Azure: `https://login.microsoftonline.com/{tenant_id}/v2.0`
|
||||
pub issuer_url: String,
|
||||
/// Application / Client ID.
|
||||
pub client_id: String,
|
||||
/// OAuth scopes to request. For Azure: `["api://{app_id}/.default"]`
|
||||
pub scopes: Vec<String>,
|
||||
/// Authentication flow: "client_credentials", "authorization_code_pkce",
|
||||
/// "device_code", "azure_managed_identity", "workload_identity"
|
||||
pub flow: Option<String>,
|
||||
/// Client secret (required for client_credentials).
|
||||
pub client_secret: Option<String>,
|
||||
/// Redirect URI (authorization_code_pkce flow).
|
||||
pub redirect_uri: Option<String>,
|
||||
/// Port for local callback server (authorization_code_pkce, default: 8400).
|
||||
pub callback_port: Option<u16>,
|
||||
/// Client ID for user-assigned managed identity (azure_managed_identity).
|
||||
pub managed_identity_client_id: Option<String>,
|
||||
/// Path to federated token file (workload_identity).
|
||||
pub token_file: Option<String>,
|
||||
/// Seconds before expiry to trigger proactive refresh (default: 300).
|
||||
pub refresh_buffer_secs: Option<u32>,
|
||||
}
|
||||
|
||||
impl From<OAuthConfig> for lancedb::remote::oauth::OAuthConfig {
|
||||
fn from(config: OAuthConfig) -> Self {
|
||||
use lancedb::remote::oauth::OAuthFlow;
|
||||
|
||||
let flow = match config.flow.as_deref().unwrap_or("client_credentials") {
|
||||
"authorization_code_pkce" => OAuthFlow::AuthorizationCodePKCE {
|
||||
redirect_uri: config.redirect_uri,
|
||||
callback_port: config.callback_port,
|
||||
},
|
||||
"device_code" => OAuthFlow::DeviceCode,
|
||||
"azure_managed_identity" => OAuthFlow::AzureManagedIdentity {
|
||||
client_id: config.managed_identity_client_id,
|
||||
},
|
||||
"workload_identity" => OAuthFlow::WorkloadIdentity {
|
||||
token_file: config
|
||||
.token_file
|
||||
.expect("tokenFile is required for workload_identity flow"),
|
||||
},
|
||||
other => panic!("Unknown OAuth flow type: {other}"),
|
||||
};
|
||||
|
||||
Self {
|
||||
issuer_url: config.issuer_url,
|
||||
client_id: config.client_id,
|
||||
client_secret: config.client_secret,
|
||||
scopes: config.scopes,
|
||||
flow,
|
||||
refresh_buffer_secs: config.refresh_buffer_secs.map(|v| v as u64),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ClientConfig> for lancedb::remote::ClientConfig {
|
||||
fn from(config: ClientConfig) -> Self {
|
||||
Self {
|
||||
|
||||
@@ -344,6 +344,31 @@ impl Table {
|
||||
Ok(res.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn set_unenforced_primary_key(&self, columns: Vec<String>) -> napi::Result<()> {
|
||||
self.inner_ref()?
|
||||
.set_unenforced_primary_key(columns)
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> napi::Result<()> {
|
||||
let native_spec = lancedb::table::LsmWriteSpec::try_from(spec)?;
|
||||
self.inner_ref()?
|
||||
.set_lsm_write_spec(native_spec)
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn unset_lsm_write_spec(&self) -> napi::Result<()> {
|
||||
self.inner_ref()?
|
||||
.unset_lsm_write_spec()
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn version(&self) -> napi::Result<i64> {
|
||||
self.inner_ref()?
|
||||
@@ -538,6 +563,63 @@ impl From<lancedb::index::IndexConfig> for IndexConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Specification selecting Lance's MemWAL LSM-style write path for
|
||||
/// `mergeInsert`.
|
||||
///
|
||||
/// `specType` must be `"bucket"`, `"identity"`, or `"unsharded"`. For
|
||||
/// `"bucket"`, `column` and `numBuckets` are required; for `"identity"`,
|
||||
/// `column` is required.
|
||||
#[napi(object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LsmWriteSpec {
|
||||
/// One of `"bucket"`, `"identity"`, or `"unsharded"`.
|
||||
pub spec_type: String,
|
||||
/// Bucket and identity variants: the sharding column.
|
||||
pub column: Option<String>,
|
||||
/// Bucket variant: the number of buckets, in `[1, 1024]`.
|
||||
pub num_buckets: Option<u32>,
|
||||
/// Names of indexes the MemWAL should keep up to date during writes.
|
||||
pub maintained_indexes: Option<Vec<String>>,
|
||||
/// Default `ShardWriter` configuration recorded in the MemWAL index.
|
||||
pub writer_config_defaults: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
impl TryFrom<LsmWriteSpec> for lancedb::table::LsmWriteSpec {
|
||||
type Error = napi::Error;
|
||||
|
||||
fn try_from(value: LsmWriteSpec) -> napi::Result<Self> {
|
||||
let maintained = value.maintained_indexes.unwrap_or_default();
|
||||
let writer_config_defaults = value.writer_config_defaults.unwrap_or_default();
|
||||
let spec = match value.spec_type.as_str() {
|
||||
"bucket" => {
|
||||
let column = value.column.ok_or_else(|| {
|
||||
napi::Error::from_reason("LsmWriteSpec bucket requires `column`")
|
||||
})?;
|
||||
let num_buckets = value.num_buckets.ok_or_else(|| {
|
||||
napi::Error::from_reason("LsmWriteSpec bucket requires `numBuckets`")
|
||||
})?;
|
||||
Self::bucket(column, num_buckets)
|
||||
}
|
||||
"identity" => {
|
||||
let column = value.column.ok_or_else(|| {
|
||||
napi::Error::from_reason("LsmWriteSpec identity requires `column`")
|
||||
})?;
|
||||
Self::identity(column)
|
||||
}
|
||||
"unsharded" => Self::unsharded(),
|
||||
other => {
|
||||
return Err(napi::Error::from_reason(format!(
|
||||
"LsmWriteSpec `specType` must be 'bucket', 'identity', or 'unsharded', got '{}'",
|
||||
other
|
||||
)));
|
||||
}
|
||||
};
|
||||
Ok(spec
|
||||
.with_maintained_indexes(maintained)
|
||||
.with_writer_config_defaults(writer_config_defaults))
|
||||
}
|
||||
}
|
||||
|
||||
/// Statistics about a compaction operation.
|
||||
#[napi(object)]
|
||||
#[derive(Clone, Debug)]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.31.0-beta.11"
|
||||
current_version = "0.32.1-beta.0"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-python"
|
||||
version = "0.31.0-beta.11"
|
||||
version = "0.32.1-beta.0"
|
||||
publish = false
|
||||
edition.workspace = true
|
||||
description = "Python bindings for LanceDB"
|
||||
|
||||
@@ -320,6 +320,7 @@ async def connect_async(
|
||||
session: Optional[Session] = None,
|
||||
manifest_enabled: bool = False,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
oauth_config=None,
|
||||
) -> AsyncConnection:
|
||||
"""Connect to a LanceDB database.
|
||||
|
||||
@@ -410,6 +411,7 @@ async def connect_async(
|
||||
session,
|
||||
manifest_enabled,
|
||||
namespace_client_properties,
|
||||
oauth_config,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -217,6 +217,9 @@ class Table:
|
||||
async def uri(self) -> str: ...
|
||||
async def initial_storage_options(self) -> Optional[Dict[str, str]]: ...
|
||||
async def latest_storage_options(self) -> Optional[Dict[str, str]]: ...
|
||||
async def set_unenforced_primary_key(self, columns: List[str]) -> None: ...
|
||||
async def set_lsm_write_spec(self, spec: LsmWriteSpec) -> None: ...
|
||||
async def unset_lsm_write_spec(self) -> None: ...
|
||||
@property
|
||||
def tags(self) -> Tags: ...
|
||||
def query(self) -> Query: ...
|
||||
@@ -247,6 +250,7 @@ async def connect(
|
||||
session: Optional[Session],
|
||||
manifest_enabled: bool = False,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
oauth_config: Optional[Any] = None,
|
||||
) -> Connection: ...
|
||||
|
||||
class RecordBatchStream:
|
||||
@@ -255,6 +259,11 @@ class RecordBatchStream:
|
||||
def __aiter__(self) -> "RecordBatchStream": ...
|
||||
async def __anext__(self) -> pa.RecordBatch: ...
|
||||
|
||||
class ColumnOrdering(TypedDict):
|
||||
column_name: str
|
||||
ascending: bool
|
||||
nulls_first: bool
|
||||
|
||||
class Query:
|
||||
def where(self, filter: str): ...
|
||||
def where_expr(self, expr: PyExpr): ...
|
||||
@@ -268,6 +277,7 @@ class Query:
|
||||
def postfilter(self): ...
|
||||
def nearest_to(self, query_vec: pa.Array) -> VectorQuery: ...
|
||||
def nearest_to_text(self, query: dict) -> FTSQuery: ...
|
||||
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
|
||||
async def output_schema(self) -> pa.Schema: ...
|
||||
async def execute(
|
||||
self, max_batch_length: Optional[int], timeout: Optional[timedelta]
|
||||
@@ -296,6 +306,7 @@ class FTSQuery:
|
||||
def get_query(self) -> str: ...
|
||||
def add_query_vector(self, query_vec: pa.Array) -> None: ...
|
||||
def nearest_to(self, query_vec: pa.Array) -> HybridQuery: ...
|
||||
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
|
||||
async def output_schema(self) -> pa.Schema: ...
|
||||
async def execute(
|
||||
self, max_batch_length: Optional[int], timeout: Optional[timedelta]
|
||||
@@ -321,6 +332,7 @@ class VectorQuery:
|
||||
def maximum_nprobes(self, maximum_nprobes: int): ...
|
||||
def bypass_vector_index(self): ...
|
||||
def nearest_to_text(self, query: dict) -> HybridQuery: ...
|
||||
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
|
||||
def to_query_request(self) -> PyQueryRequest: ...
|
||||
|
||||
class HybridQuery:
|
||||
@@ -339,6 +351,7 @@ class HybridQuery:
|
||||
def minimum_nprobes(self, minimum_nprobes: int): ...
|
||||
def maximum_nprobes(self, maximum_nprobes: int): ...
|
||||
def bypass_vector_index(self): ...
|
||||
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
|
||||
def to_vector_query(self) -> VectorQuery: ...
|
||||
def to_fts_query(self) -> FTSQuery: ...
|
||||
def get_limit(self) -> int: ...
|
||||
@@ -368,6 +381,7 @@ class PyQueryRequest:
|
||||
bypass_vector_index: Optional[bool]
|
||||
postfilter: Optional[bool]
|
||||
norm: Optional[str]
|
||||
order_by: Optional[List[ColumnOrdering]]
|
||||
|
||||
class CompactionStats:
|
||||
fragments_removed: int
|
||||
@@ -408,6 +422,37 @@ class MergeResult:
|
||||
num_deleted_rows: int
|
||||
num_attempts: int
|
||||
|
||||
class LsmWriteSpec:
|
||||
"""Specification selecting Lance's MemWAL LSM-style write path for
|
||||
`merge_insert`."""
|
||||
|
||||
@staticmethod
|
||||
def bucket(column: str, num_buckets: int) -> "LsmWriteSpec": ...
|
||||
@staticmethod
|
||||
def identity(column: str) -> "LsmWriteSpec": ...
|
||||
@staticmethod
|
||||
def unsharded() -> "LsmWriteSpec": ...
|
||||
def with_maintained_indexes(self, indexes: List[str]) -> "LsmWriteSpec":
|
||||
"""Return a copy of this spec asking the MemWAL to keep the named
|
||||
indexes up to date as rows are appended."""
|
||||
...
|
||||
def with_writer_config_defaults(self, defaults: Dict[str, str]) -> "LsmWriteSpec":
|
||||
"""Return a copy of this spec recording the given default
|
||||
`ShardWriter` configuration in the MemWAL index."""
|
||||
...
|
||||
@property
|
||||
def spec_type(self) -> str:
|
||||
"""One of 'bucket', 'identity', or 'unsharded'."""
|
||||
...
|
||||
@property
|
||||
def column(self) -> Optional[str]: ...
|
||||
@property
|
||||
def num_buckets(self) -> Optional[int]: ...
|
||||
@property
|
||||
def maintained_indexes(self) -> List[str]: ...
|
||||
@property
|
||||
def writer_config_defaults(self) -> Dict[str, str]: ...
|
||||
|
||||
class AddColumnsResult:
|
||||
version: int
|
||||
|
||||
|
||||
@@ -968,22 +968,32 @@ class Permutation:
|
||||
new.transform_fn = transform
|
||||
return new
|
||||
|
||||
def take_offsets(self, offsets: list[int]) -> Any:
|
||||
"""
|
||||
Take rows from the permutation by offset
|
||||
|
||||
The returned value is passed through the permutation's current transform,
|
||||
so `with_format` and `with_transform` affect this method in the same way
|
||||
they affect iteration.
|
||||
"""
|
||||
|
||||
async def do_take_offsets():
|
||||
return await self.reader.take_offsets(offsets, selection=self.selection)
|
||||
|
||||
batch = LOOP.run(do_take_offsets())
|
||||
return self.transform_fn(batch)
|
||||
|
||||
def __getitem__(self, index: int) -> Any:
|
||||
"""
|
||||
Returns a single row from the permutation by offset
|
||||
"""
|
||||
return self.__getitems__([index])
|
||||
return self.take_offsets([index])
|
||||
|
||||
def __getitems__(self, indices: list[int]) -> Any:
|
||||
"""
|
||||
Returns rows from the permutation by offset
|
||||
"""
|
||||
|
||||
async def do_getitems():
|
||||
return await self.reader.take_offsets(indices, selection=self.selection)
|
||||
|
||||
batch = LOOP.run(do_getitems())
|
||||
return self.transform_fn(batch)
|
||||
return self.take_offsets(indices)
|
||||
|
||||
@deprecated(details="Use with_skip instead")
|
||||
def skip(self, skip: int) -> "Permutation":
|
||||
|
||||
@@ -92,6 +92,12 @@ def ensure_vector_query(
|
||||
return val
|
||||
|
||||
|
||||
class ColumnOrdering(pydantic.BaseModel):
|
||||
column_name: str
|
||||
ascending: bool = True
|
||||
nulls_first: bool = False
|
||||
|
||||
|
||||
class FullTextQueryType(str, Enum):
|
||||
MATCH = "match"
|
||||
MATCH_PHRASE = "match_phrase"
|
||||
@@ -504,6 +510,8 @@ class Query(pydantic.BaseModel):
|
||||
# Bypass the vector index and use a brute force search
|
||||
bypass_vector_index: Optional[bool] = None
|
||||
|
||||
order_by: Optional[List[ColumnOrdering]] = None
|
||||
|
||||
@classmethod
|
||||
def from_inner(cls, req: PyQueryRequest) -> Self:
|
||||
query = cls()
|
||||
@@ -524,6 +532,8 @@ class Query(pydantic.BaseModel):
|
||||
query.refine_factor = req.refine_factor
|
||||
query.bypass_vector_index = req.bypass_vector_index
|
||||
query.postfilter = req.postfilter
|
||||
if req.order_by is not None:
|
||||
query.order_by = [ColumnOrdering(**o) for o in req.order_by]
|
||||
if req.full_text_search is not None:
|
||||
query.full_text_query = FullTextSearchQuery(
|
||||
columns=None,
|
||||
@@ -572,9 +582,22 @@ class LanceQueryBuilder(ABC):
|
||||
If "auto", the query type is inferred based on the query.
|
||||
vector_column_name: str
|
||||
The name of the vector column to use for vector search.
|
||||
ordering_field_name: Optional[str]
|
||||
.. deprecated:: 0.27.0
|
||||
Use ``order_by()`` method instead.
|
||||
fts_columns: Optional[Union[str, List[str]]]
|
||||
The columns to search in for full text search.
|
||||
fast_search: bool
|
||||
Skip flat search of unindexed data.
|
||||
"""
|
||||
if ordering_field_name is not None:
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"ordering_field_name is deprecated, use .order_by() method instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
# Check hybrid search first as it supports empty query pattern
|
||||
if query_type == "hybrid":
|
||||
# hybrid fts and vector query
|
||||
@@ -671,6 +694,7 @@ class LanceQueryBuilder(ABC):
|
||||
self._text = None
|
||||
self._ef = None
|
||||
self._bypass_vector_index = None
|
||||
self._order_by = None
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.3.1",
|
||||
@@ -947,6 +971,24 @@ class LanceQueryBuilder(ABC):
|
||||
""" # noqa: E501
|
||||
return self._table._explain_plan(self.to_query_object(), verbose=verbose)
|
||||
|
||||
def order_by(self, ordering: Optional[List[ColumnOrdering]]) -> Self:
|
||||
"""
|
||||
Set the ordering for the results.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ordering: Optional[List[ColumnOrdering]]
|
||||
The ordering to use for the results. If None, then the default ordering
|
||||
will be used.
|
||||
|
||||
Returns
|
||||
-------
|
||||
LanceQueryBuilder
|
||||
The LanceQueryBuilder object.
|
||||
"""
|
||||
self._order_by = ordering
|
||||
return self
|
||||
|
||||
def analyze_plan(self) -> str:
|
||||
"""
|
||||
Run the query and return its execution plan with runtime metrics.
|
||||
@@ -1314,6 +1356,7 @@ class LanceVectorQueryBuilder(LanceQueryBuilder):
|
||||
fast_search=self._fast_search,
|
||||
ef=self._ef,
|
||||
bypass_vector_index=self._bypass_vector_index,
|
||||
order_by=self._order_by,
|
||||
)
|
||||
|
||||
def to_batches(
|
||||
@@ -1465,7 +1508,9 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
|
||||
super().__init__(table)
|
||||
self._query = query
|
||||
self._phrase_query = False
|
||||
self.ordering_field_name = ordering_field_name
|
||||
# Deprecated compatibility parameter. Native FTS ordering is now
|
||||
# configured through order_by(); LanceQueryBuilder.create emits the warning.
|
||||
_ = ordering_field_name
|
||||
self._reranker = None
|
||||
self._fast_search = fast_search
|
||||
if isinstance(fts_columns, str):
|
||||
@@ -1514,6 +1559,7 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
|
||||
),
|
||||
offset=self._offset,
|
||||
fast_search=self._fast_search,
|
||||
order_by=self._order_by,
|
||||
)
|
||||
|
||||
def output_schema(self) -> pa.Schema:
|
||||
@@ -1579,6 +1625,7 @@ class LanceEmptyQueryBuilder(LanceQueryBuilder):
|
||||
limit=self._limit,
|
||||
with_row_id=self._with_row_id,
|
||||
offset=self._offset,
|
||||
order_by=self._order_by,
|
||||
)
|
||||
|
||||
def output_schema(self) -> pa.Schema:
|
||||
@@ -2502,6 +2549,27 @@ class AsyncStandardQuery(AsyncQueryBase):
|
||||
self._inner.offset(offset)
|
||||
return self
|
||||
|
||||
def order_by(self, ordering: Optional[List[ColumnOrdering]]) -> Self:
|
||||
"""
|
||||
Set the ordering for the results.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ordering: Optional[List[ColumnOrdering]]
|
||||
The ordering to use for the results. If None, then the default ordering
|
||||
will be used.
|
||||
"""
|
||||
if ordering is None:
|
||||
self._inner.order_by(None)
|
||||
else:
|
||||
self._inner.order_by(
|
||||
[
|
||||
o.model_dump() if hasattr(o, "model_dump") else o.dict()
|
||||
for o in ordering
|
||||
]
|
||||
)
|
||||
return self
|
||||
|
||||
def fast_search(self) -> Self:
|
||||
"""
|
||||
Skip searching un-indexed data.
|
||||
|
||||
@@ -9,6 +9,7 @@ from typing import List, Optional
|
||||
from lancedb import __version__
|
||||
|
||||
from .header import HeaderProvider
|
||||
from .oauth import OAuthConfig, OAuthFlowType
|
||||
|
||||
__all__ = [
|
||||
"TimeoutConfig",
|
||||
@@ -16,6 +17,8 @@ __all__ = [
|
||||
"TlsConfig",
|
||||
"ClientConfig",
|
||||
"HeaderProvider",
|
||||
"OAuthConfig",
|
||||
"OAuthFlowType",
|
||||
]
|
||||
|
||||
|
||||
|
||||
90
python/python/lancedb/remote/oauth.py
Normal file
90
python/python/lancedb/remote/oauth.py
Normal file
@@ -0,0 +1,90 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
class OAuthFlowType(str, Enum):
|
||||
"""OAuth authentication flow types."""
|
||||
|
||||
CLIENT_CREDENTIALS = "client_credentials"
|
||||
"""Client Credentials grant (service-to-service / M2M)."""
|
||||
|
||||
AUTHORIZATION_CODE_PKCE = "authorization_code_pkce"
|
||||
"""Authorization Code with PKCE (interactive browser-based auth)."""
|
||||
|
||||
DEVICE_CODE = "device_code"
|
||||
"""Device Code grant (CLI / headless environments)."""
|
||||
|
||||
AZURE_MANAGED_IDENTITY = "azure_managed_identity"
|
||||
"""Azure Managed Identity via IMDS."""
|
||||
|
||||
WORKLOAD_IDENTITY = "workload_identity"
|
||||
"""Workload Identity Federation (K8s, GitHub Actions)."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class OAuthConfig:
|
||||
"""OAuth configuration for LanceDB authentication.
|
||||
|
||||
All token acquisition and refresh is handled in the Rust layer.
|
||||
This config is passed through to Rust via PyO3.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
issuer_url : str
|
||||
OIDC issuer URL or OAuth authority URL.
|
||||
For Azure: ``https://login.microsoftonline.com/{tenant_id}/v2.0``
|
||||
client_id : str
|
||||
Application / Client ID.
|
||||
scopes : List[str]
|
||||
OAuth scopes to request.
|
||||
For Azure: ``["api://{app_id}/.default"]``
|
||||
flow : OAuthFlowType
|
||||
Authentication flow to use. Default: CLIENT_CREDENTIALS.
|
||||
client_secret : Optional[str]
|
||||
Client secret (required for CLIENT_CREDENTIALS).
|
||||
redirect_uri : Optional[str]
|
||||
Redirect URI for AUTHORIZATION_CODE_PKCE flow.
|
||||
callback_port : Optional[int]
|
||||
Port for local HTTP callback server (AUTHORIZATION_CODE_PKCE, default: 8400).
|
||||
managed_identity_client_id : Optional[str]
|
||||
Client ID for user-assigned managed identity (AZURE_MANAGED_IDENTITY).
|
||||
token_file : Optional[str]
|
||||
Path to federated token file (WORKLOAD_IDENTITY).
|
||||
refresh_buffer_secs : Optional[int]
|
||||
Seconds before expiry to trigger proactive refresh (default: 300).
|
||||
|
||||
Examples
|
||||
--------
|
||||
Client Credentials (service-to-service):
|
||||
|
||||
>>> config = OAuthConfig(
|
||||
... issuer_url="https://login.microsoftonline.com/{tenant}/v2.0",
|
||||
... client_id="app-id",
|
||||
... client_secret="secret",
|
||||
... scopes=["api://lancedb-api/.default"],
|
||||
... )
|
||||
|
||||
Azure Managed Identity:
|
||||
|
||||
>>> config = OAuthConfig(
|
||||
... issuer_url="https://login.microsoftonline.com/{tenant}/v2.0",
|
||||
... client_id="app-id",
|
||||
... scopes=["api://lancedb-api/.default"],
|
||||
... flow=OAuthFlowType.AZURE_MANAGED_IDENTITY,
|
||||
... )
|
||||
"""
|
||||
|
||||
issuer_url: str
|
||||
client_id: str
|
||||
scopes: List[str]
|
||||
flow: OAuthFlowType = OAuthFlowType.CLIENT_CREDENTIALS
|
||||
client_secret: Optional[str] = None
|
||||
redirect_uri: Optional[str] = None
|
||||
callback_port: Optional[int] = None
|
||||
managed_identity_client_id: Optional[str] = None
|
||||
token_file: Optional[str] = None
|
||||
refresh_buffer_secs: Optional[int] = None
|
||||
@@ -14,6 +14,7 @@ from lancedb._lancedb import (
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
IndexConfig,
|
||||
LsmWriteSpec,
|
||||
MergeResult,
|
||||
UpdateResult,
|
||||
)
|
||||
@@ -655,6 +656,18 @@ class RemoteTable(Table):
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None:
|
||||
"""Not supported on LanceDB Cloud."""
|
||||
return LOOP.run(self._table.set_unenforced_primary_key(columns))
|
||||
|
||||
def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
|
||||
"""Not supported on LanceDB Cloud."""
|
||||
return LOOP.run(self._table.set_lsm_write_spec(spec))
|
||||
|
||||
def unset_lsm_write_spec(self) -> None:
|
||||
"""Not supported on LanceDB Cloud."""
|
||||
return LOOP.run(self._table.unset_lsm_write_spec())
|
||||
|
||||
def drop_index(self, index_name: str):
|
||||
return LOOP.run(self._table.drop_index(index_name))
|
||||
|
||||
|
||||
@@ -154,6 +154,7 @@ if TYPE_CHECKING:
|
||||
AlterColumnsResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
LsmWriteSpec,
|
||||
MergeResult,
|
||||
UpdateResult,
|
||||
)
|
||||
@@ -3263,6 +3264,21 @@ class LanceTable(Table):
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None:
|
||||
"""Set the unenforced primary key. See
|
||||
[`AsyncTable.set_unenforced_primary_key`][lancedb.AsyncTable.set_unenforced_primary_key]."""
|
||||
return LOOP.run(self._table.set_unenforced_primary_key(columns))
|
||||
|
||||
def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
|
||||
"""Install an LsmWriteSpec. See
|
||||
[`AsyncTable.set_lsm_write_spec`][lancedb.AsyncTable.set_lsm_write_spec]."""
|
||||
return LOOP.run(self._table.set_lsm_write_spec(spec))
|
||||
|
||||
def unset_lsm_write_spec(self) -> None:
|
||||
"""Remove the LsmWriteSpec. See
|
||||
[`AsyncTable.unset_lsm_write_spec`][lancedb.AsyncTable.unset_lsm_write_spec]."""
|
||||
return LOOP.run(self._table.unset_lsm_write_spec())
|
||||
|
||||
def uses_v2_manifest_paths(self) -> bool:
|
||||
"""
|
||||
Check if the table is using the new v2 manifest paths.
|
||||
@@ -3808,6 +3824,69 @@ class AsyncTable:
|
||||
Any attempt to use the table after it has been closed will raise an error."""
|
||||
return self._inner.close()
|
||||
|
||||
async def set_unenforced_primary_key(
|
||||
self, columns: Union[str, Iterable[str]]
|
||||
) -> None:
|
||||
"""Set the unenforced primary key for this table to the given
|
||||
ordered list of columns.
|
||||
|
||||
"Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
columns are recorded in the schema as the primary key so that
|
||||
features such as `merge_insert` can use them. Calling this again
|
||||
replaces any previously-set primary key.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
columns : str or Iterable[str]
|
||||
Either a single column name (single-column key) or an ordered
|
||||
iterable of column names (composite key). Each column dtype
|
||||
must be one of: int32, int64, utf8, large_utf8, binary,
|
||||
large_binary, fixed_size_binary.
|
||||
"""
|
||||
if isinstance(columns, str):
|
||||
columns = [columns]
|
||||
else:
|
||||
columns = list(columns)
|
||||
await self._inner.set_unenforced_primary_key(columns)
|
||||
|
||||
async def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
|
||||
"""Install an LsmWriteSpec on this table.
|
||||
|
||||
The spec selects Lance's MemWAL LSM-style write path for future
|
||||
`merge_insert` calls. ``LsmWriteSpec`` chooses one of three sharding
|
||||
strategies:
|
||||
|
||||
- ``LsmWriteSpec.bucket(column, num_buckets)`` — hash-bucket writes by
|
||||
the single-column unenforced primary key.
|
||||
- ``LsmWriteSpec.identity(column)`` — shard by the raw value of a
|
||||
scalar column.
|
||||
- ``LsmWriteSpec.unsharded()`` — route every write to a single shard.
|
||||
|
||||
All variants require the table to have an unenforced primary key set
|
||||
via [`set_unenforced_primary_key`]; bucket sharding additionally
|
||||
requires it to be the single column being bucketed.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
spec : LsmWriteSpec
|
||||
The sharding spec to install.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> from lancedb._lancedb import LsmWriteSpec
|
||||
>>> # table.set_unenforced_primary_key("id")
|
||||
>>> # table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 16))
|
||||
"""
|
||||
await self._inner.set_lsm_write_spec(spec)
|
||||
|
||||
async def unset_lsm_write_spec(self) -> None:
|
||||
"""Remove the LsmWriteSpec from this table.
|
||||
|
||||
Reverts to the standard `merge_insert` write path. Errors if no spec
|
||||
is currently set.
|
||||
"""
|
||||
await self._inner.unset_lsm_write_spec()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""The name of the table."""
|
||||
@@ -4512,6 +4591,8 @@ class AsyncTable:
|
||||
async_query = async_query.fast_search()
|
||||
if query.with_row_id:
|
||||
async_query = async_query.with_row_id()
|
||||
if query.order_by:
|
||||
async_query = async_query.order_by(query.order_by)
|
||||
|
||||
if query.vector:
|
||||
async_query = async_query.nearest_to(query.vector).distance_range(
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
segmenter:
|
||||
mode: "normal"
|
||||
dictionary:
|
||||
path: "./python/tests/models/lindera/ipadic/main"
|
||||
dictionary: "./python/tests/models/lindera/ipadic/main"
|
||||
|
||||
Binary file not shown.
@@ -29,6 +29,7 @@ from lancedb.query import (
|
||||
MultiMatchQuery,
|
||||
PhraseQuery,
|
||||
BooleanQuery,
|
||||
ColumnOrdering,
|
||||
Occur,
|
||||
LanceFtsQueryBuilder,
|
||||
)
|
||||
@@ -116,8 +117,7 @@ def lindera_ipadic(language_model_home):
|
||||
config_path.write_text(
|
||||
"segmenter:\n"
|
||||
' mode: "normal"\n'
|
||||
" dictionary:\n"
|
||||
f' path: "{extracted_model.resolve().as_posix()}"\n',
|
||||
f' dictionary: "{extracted_model.resolve().as_posix()}"\n',
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
@@ -500,6 +500,36 @@ async def test_search_fts_specify_column_async(async_table):
|
||||
pass
|
||||
|
||||
|
||||
def test_search_order_by_descending(table):
|
||||
table.create_fts_index("text")
|
||||
rows = (
|
||||
table.search("puppy")
|
||||
.order_by([ColumnOrdering(column_name="count", ascending=False)])
|
||||
.limit(20)
|
||||
.select(["text", "count"])
|
||||
.to_list()
|
||||
)
|
||||
|
||||
for r in rows:
|
||||
assert "puppy" in r["text"]
|
||||
assert sorted(rows, key=lambda x: x["count"], reverse=True) == rows
|
||||
|
||||
|
||||
def test_search_order_by_ascending(table):
|
||||
table.create_fts_index("text")
|
||||
rows = (
|
||||
table.search("puppy")
|
||||
.order_by([ColumnOrdering(column_name="count", ascending=True)])
|
||||
.limit(20)
|
||||
.select(["text", "count"])
|
||||
.to_list()
|
||||
)
|
||||
|
||||
for r in rows:
|
||||
assert "puppy" in r["text"]
|
||||
assert sorted(rows, key=lambda x: x["count"]) == rows
|
||||
|
||||
|
||||
def test_create_index_from_table(tmp_path, table):
|
||||
table.create_fts_index("text")
|
||||
df = table.search("puppy").limit(5).select(["text"]).to_pandas()
|
||||
|
||||
149
python/python/tests/test_lsm_write_spec.py
Normal file
149
python/python/tests/test_lsm_write_spec.py
Normal file
@@ -0,0 +1,149 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
"""Tests for installing and clearing an LsmWriteSpec via
|
||||
`Table.set_lsm_write_spec` / `Table.unset_lsm_write_spec`.
|
||||
"""
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
import lancedb
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
from lancedb._lancedb import LsmWriteSpec
|
||||
|
||||
SCHEMA = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.utf8(), nullable=False),
|
||||
pa.field("v", pa.int32(), nullable=False),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _batch(ids, vs):
|
||||
return pa.RecordBatch.from_arrays(
|
||||
[pa.array(ids, type=pa.utf8()), pa.array(vs, type=pa.int32())],
|
||||
schema=SCHEMA,
|
||||
)
|
||||
|
||||
|
||||
def _reader(ids, vs):
|
||||
return pa.RecordBatchReader.from_batches(SCHEMA, [_batch(ids, vs)])
|
||||
|
||||
|
||||
def _make_table(tmp_path):
|
||||
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
|
||||
table = db.create_table("t", _reader(["seed"], [0]))
|
||||
return db, table
|
||||
|
||||
|
||||
def test_set_lsm_write_spec_validates(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
|
||||
# No PK set yet.
|
||||
with pytest.raises(Exception, match="primary key"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
|
||||
table.set_unenforced_primary_key("id")
|
||||
|
||||
# Column mismatch.
|
||||
with pytest.raises(Exception, match="match"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("v", 4))
|
||||
|
||||
# Out-of-range num_buckets.
|
||||
with pytest.raises(Exception, match="num_buckets"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0))
|
||||
with pytest.raises(Exception, match="num_buckets"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1025))
|
||||
|
||||
# Happy path then mutation rejected.
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
with pytest.raises(Exception, match="mutation"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
|
||||
|
||||
|
||||
def test_unset_lsm_write_spec(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
|
||||
# unset errors when no spec is set.
|
||||
with pytest.raises(Exception, match="no LSM write spec"):
|
||||
table.unset_lsm_write_spec()
|
||||
|
||||
# Install a spec, then remove it; afterwards a fresh spec can be set.
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
table.unset_lsm_write_spec()
|
||||
# A second unset errors — there is no spec left to remove.
|
||||
with pytest.raises(Exception, match="no LSM write spec"):
|
||||
table.unset_lsm_write_spec()
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
|
||||
|
||||
|
||||
def test_set_unsharded_spec(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
# Lance MemWAL still requires a primary key on the dataset; Unsharded
|
||||
# just skips per-row hashing.
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.unsharded())
|
||||
table.unset_lsm_write_spec()
|
||||
|
||||
|
||||
def test_lsm_write_spec_repr():
|
||||
s = LsmWriteSpec.bucket("id", 4)
|
||||
assert s.spec_type == "bucket"
|
||||
assert s.column == "id"
|
||||
assert s.num_buckets == 4
|
||||
assert s.maintained_indexes == []
|
||||
assert "bucket" in repr(s)
|
||||
assert "id" in repr(s)
|
||||
assert "4" in repr(s)
|
||||
|
||||
u = LsmWriteSpec.unsharded()
|
||||
assert u.spec_type == "unsharded"
|
||||
assert u.column is None
|
||||
assert u.num_buckets is None
|
||||
assert "unsharded" in repr(u)
|
||||
|
||||
|
||||
def test_lsm_write_spec_with_maintained_indexes():
|
||||
s = LsmWriteSpec.bucket("id", 4).with_maintained_indexes(["idx_a", "idx_b"])
|
||||
assert s.maintained_indexes == ["idx_a", "idx_b"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_set_unset_lsm_write_spec(tmp_path):
|
||||
db = await lancedb.connect_async(
|
||||
tmp_path, read_consistency_interval=timedelta(seconds=0)
|
||||
)
|
||||
table = await db.create_table(
|
||||
"t",
|
||||
pa.RecordBatchReader.from_batches(SCHEMA, [_batch(["seed"], [0])]),
|
||||
)
|
||||
|
||||
await table.set_unenforced_primary_key("id")
|
||||
await table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
await table.unset_lsm_write_spec()
|
||||
# A second unset errors.
|
||||
with pytest.raises(Exception, match="no LSM write spec"):
|
||||
await table.unset_lsm_write_spec()
|
||||
|
||||
|
||||
def test_set_identity_spec(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
# Identity sharding still requires an unenforced primary key on the
|
||||
# table; it shards by the raw value of the given column.
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.identity("v"))
|
||||
table.unset_lsm_write_spec()
|
||||
|
||||
|
||||
def test_lsm_write_spec_identity_and_writer_config_defaults():
|
||||
s = LsmWriteSpec.identity("v")
|
||||
assert s.spec_type == "identity"
|
||||
assert s.column == "v"
|
||||
assert s.num_buckets is None
|
||||
assert "identity" in repr(s)
|
||||
|
||||
s = s.with_writer_config_defaults({"durable_write": "false"})
|
||||
assert s.writer_config_defaults == {"durable_write": "false"}
|
||||
assert "durable_write" in repr(s)
|
||||
@@ -1080,3 +1080,29 @@ def test_getitems_invalid_offset(some_permutation: Permutation):
|
||||
"""Test __getitems__ with an out-of-range offset raises an error."""
|
||||
with pytest.raises(Exception):
|
||||
some_permutation.__getitems__([999999])
|
||||
|
||||
|
||||
def test_take_offsets(some_permutation: Permutation):
|
||||
result = some_permutation.take_offsets([0, 1, 2])
|
||||
|
||||
assert isinstance(result, list)
|
||||
assert "id" in result[0]
|
||||
assert "value" in result[0]
|
||||
assert len(result) == 3
|
||||
|
||||
|
||||
def test_take_offsets_empty_identity_permutation(mem_db):
|
||||
tbl = mem_db.create_table(
|
||||
"test_table", pa.table({"id": range(10), "value": range(10)})
|
||||
)
|
||||
permutation = Permutation.identity(tbl)
|
||||
|
||||
result = permutation.take_offsets([])
|
||||
|
||||
assert result == []
|
||||
|
||||
|
||||
def test_take_offsets_empty_permutation(some_permutation: Permutation):
|
||||
result = some_permutation.take_offsets([])
|
||||
|
||||
assert result == []
|
||||
|
||||
79
python/python/tests/test_primary_key.py
Normal file
79
python/python/tests/test_primary_key.py
Normal file
@@ -0,0 +1,79 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
"""Tests for Table.set_unenforced_primary_key."""
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
import lancedb
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
|
||||
|
||||
def _empty_table(path, schema):
|
||||
db = lancedb.connect(path, read_consistency_interval=timedelta(seconds=0))
|
||||
return db.create_table("t", schema=schema)
|
||||
|
||||
|
||||
def test_set_unenforced_primary_key_accepts_string_or_one_element_list(tmp_path):
|
||||
schema = pa.schema([pa.field("id", pa.int64(), nullable=False)])
|
||||
|
||||
# Bare string.
|
||||
table = _empty_table(tmp_path / "s", schema)
|
||||
table.set_unenforced_primary_key("id")
|
||||
|
||||
# One-element list.
|
||||
table = _empty_table(tmp_path / "l", schema)
|
||||
table.set_unenforced_primary_key(["id"])
|
||||
|
||||
|
||||
def test_set_unenforced_primary_key_rejects_compound_and_empty(tmp_path):
|
||||
table = _empty_table(
|
||||
tmp_path,
|
||||
pa.schema(
|
||||
[
|
||||
pa.field("a", pa.utf8(), nullable=False),
|
||||
pa.field("b", pa.int64(), nullable=False),
|
||||
]
|
||||
),
|
||||
)
|
||||
# Compound keys are not supported.
|
||||
with pytest.raises(Exception, match="compound"):
|
||||
table.set_unenforced_primary_key(["a", "b"])
|
||||
# Empty input.
|
||||
with pytest.raises(Exception, match="required"):
|
||||
table.set_unenforced_primary_key([])
|
||||
|
||||
|
||||
def test_set_unenforced_primary_key_is_immutable(tmp_path):
|
||||
table = _empty_table(
|
||||
tmp_path,
|
||||
pa.schema(
|
||||
[
|
||||
pa.field("a", pa.utf8(), nullable=False),
|
||||
pa.field("b", pa.int64(), nullable=False),
|
||||
]
|
||||
),
|
||||
)
|
||||
table.set_unenforced_primary_key("a")
|
||||
# The primary key cannot be changed or re-set once installed.
|
||||
with pytest.raises(Exception, match="already set"):
|
||||
table.set_unenforced_primary_key("b")
|
||||
with pytest.raises(Exception, match="already set"):
|
||||
table.set_unenforced_primary_key("a")
|
||||
|
||||
|
||||
def test_set_unenforced_primary_key_validates(tmp_path):
|
||||
table = _empty_table(
|
||||
tmp_path / "t", pa.schema([pa.field("id", pa.utf8(), nullable=False)])
|
||||
)
|
||||
# Unknown column.
|
||||
with pytest.raises(Exception, match="not found"):
|
||||
table.set_unenforced_primary_key("nonexistent")
|
||||
|
||||
# Unsupported dtype (Float32 not in the supported set).
|
||||
bad = _empty_table(
|
||||
tmp_path / "bad", pa.schema([pa.field("id", pa.float32(), nullable=False)])
|
||||
)
|
||||
with pytest.raises(Exception, match="not supported"):
|
||||
bad.set_unenforced_primary_key("id")
|
||||
@@ -25,6 +25,7 @@ from lancedb.query import (
|
||||
AsyncHybridQuery,
|
||||
AsyncQueryBase,
|
||||
AsyncVectorQuery,
|
||||
ColumnOrdering,
|
||||
LanceVectorQueryBuilder,
|
||||
MatchQuery,
|
||||
PhraseQuery,
|
||||
@@ -164,6 +165,71 @@ def test_offset(table):
|
||||
assert len(results_with_offset.to_pandas()) == 1
|
||||
|
||||
|
||||
def test_order_by_plain_query(mem_db):
|
||||
table = mem_db.create_table(
|
||||
"test_order_by",
|
||||
pa.table(
|
||||
{
|
||||
"group": [1, 1, 1, 2],
|
||||
"score": [None, 1.0, 1.0, 0.5],
|
||||
"name": ["z", "b", "a", "c"],
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
res = (
|
||||
table.search()
|
||||
.order_by(
|
||||
[
|
||||
ColumnOrdering(column_name="group", ascending=True, nulls_first=False),
|
||||
ColumnOrdering(column_name="score", ascending=True, nulls_first=True),
|
||||
ColumnOrdering(column_name="name", ascending=True, nulls_first=False),
|
||||
]
|
||||
)
|
||||
.to_arrow()
|
||||
)
|
||||
|
||||
assert res.select(["group", "score", "name"]).to_pylist() == [
|
||||
{"group": 1, "score": None, "name": "z"},
|
||||
{"group": 1, "score": 1.0, "name": "a"},
|
||||
{"group": 1, "score": 1.0, "name": "b"},
|
||||
{"group": 2, "score": 0.5, "name": "c"},
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_order_by_async_query(mem_db_async: AsyncConnection):
|
||||
table = await mem_db_async.create_table(
|
||||
"test_order_by_async",
|
||||
pa.table(
|
||||
{
|
||||
"group": [1, 1, 1, 2],
|
||||
"score": [None, 1.0, 1.0, 0.5],
|
||||
"name": ["z", "b", "a", "c"],
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
res = await (
|
||||
table.query()
|
||||
.order_by(
|
||||
[
|
||||
ColumnOrdering(column_name="group", ascending=True, nulls_first=False),
|
||||
ColumnOrdering(column_name="score", ascending=True, nulls_first=True),
|
||||
ColumnOrdering(column_name="name", ascending=True, nulls_first=False),
|
||||
]
|
||||
)
|
||||
.to_arrow()
|
||||
)
|
||||
|
||||
assert res.select(["group", "score", "name"]).to_pylist() == [
|
||||
{"group": 1, "score": None, "name": "z"},
|
||||
{"group": 1, "score": 1.0, "name": "a"},
|
||||
{"group": 1, "score": 1.0, "name": "b"},
|
||||
{"group": 2, "score": 0.5, "name": "c"},
|
||||
]
|
||||
|
||||
|
||||
def test_query_builder(table):
|
||||
rs = (
|
||||
LanceVectorQueryBuilder(table, [0, 0], "vector")
|
||||
|
||||
@@ -16,6 +16,7 @@ from packaging.version import Version
|
||||
|
||||
import lancedb
|
||||
from lancedb.conftest import MockTextEmbeddingFunction
|
||||
from lancedb.query import ColumnOrdering
|
||||
from lancedb.remote import ClientConfig
|
||||
from lancedb.remote.errors import HttpError, RetryError
|
||||
import pytest
|
||||
@@ -660,6 +661,18 @@ def test_query_sync_maximal():
|
||||
"ef": None,
|
||||
"filter": "id > 0",
|
||||
"columns": ["id", "name"],
|
||||
"order_by": [
|
||||
{
|
||||
"column_name": "score",
|
||||
"ascending": False,
|
||||
"nulls_first": True,
|
||||
},
|
||||
{
|
||||
"column_name": "id",
|
||||
"ascending": True,
|
||||
"nulls_first": False,
|
||||
},
|
||||
],
|
||||
"vector_column": "vector2",
|
||||
"fast_search": True,
|
||||
"with_row_id": True,
|
||||
@@ -677,6 +690,14 @@ def test_query_sync_maximal():
|
||||
.refine_factor(10)
|
||||
.nprobes(5)
|
||||
.where("id > 0", prefilter=True)
|
||||
.order_by(
|
||||
[
|
||||
ColumnOrdering(
|
||||
column_name="score", ascending=False, nulls_first=True
|
||||
),
|
||||
ColumnOrdering(column_name="id", ascending=True, nulls_first=False),
|
||||
]
|
||||
)
|
||||
.with_row_id(True)
|
||||
.select(["id", "name"])
|
||||
.to_list()
|
||||
|
||||
@@ -539,7 +539,7 @@ impl Connection {
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None))]
|
||||
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None, oauth_config=None))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn connect(
|
||||
py: Python<'_>,
|
||||
@@ -553,6 +553,7 @@ pub fn connect(
|
||||
session: Option<crate::session::Session>,
|
||||
manifest_enabled: bool,
|
||||
namespace_client_properties: Option<HashMap<String, String>>,
|
||||
oauth_config: Option<crate::oauth::PyOAuthConfig>,
|
||||
) -> PyResult<Bound<'_, PyAny>> {
|
||||
future_into_py(py, async move {
|
||||
let mut builder = lancedb::connect(&uri);
|
||||
@@ -582,6 +583,10 @@ pub fn connect(
|
||||
if let Some(client_config) = client_config {
|
||||
builder = builder.client_config(client_config.into());
|
||||
}
|
||||
if let Some(oauth_config) = oauth_config {
|
||||
let config: lancedb::remote::oauth::OAuthConfig = oauth_config.into();
|
||||
builder = builder.oauth_config(config);
|
||||
}
|
||||
if let Some(session) = session {
|
||||
builder = builder.session(session.inner.clone());
|
||||
}
|
||||
|
||||
@@ -15,8 +15,8 @@ use pyo3::{
|
||||
use query::{FTSQuery, HybridQuery, Query, VectorQuery};
|
||||
use session::Session;
|
||||
use table::{
|
||||
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, MergeResult,
|
||||
Table, UpdateResult,
|
||||
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec,
|
||||
MergeResult, Table, UpdateResult,
|
||||
};
|
||||
|
||||
pub mod arrow;
|
||||
@@ -26,6 +26,7 @@ pub mod expr;
|
||||
pub mod header;
|
||||
pub mod index;
|
||||
pub mod namespace;
|
||||
pub mod oauth;
|
||||
pub mod permutation;
|
||||
pub mod query;
|
||||
pub mod runtime;
|
||||
@@ -52,6 +53,7 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
|
||||
m.add_class::<AlterColumnsResult>()?;
|
||||
m.add_class::<AddResult>()?;
|
||||
m.add_class::<MergeResult>()?;
|
||||
m.add_class::<LsmWriteSpec>()?;
|
||||
m.add_class::<DeleteResult>()?;
|
||||
m.add_class::<DropColumnsResult>()?;
|
||||
m.add_class::<UpdateResult>()?;
|
||||
|
||||
53
python/src/oauth.rs
Normal file
53
python/src/oauth.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use pyo3::FromPyObject;
|
||||
|
||||
use lancedb::remote::oauth::{OAuthConfig, OAuthFlow};
|
||||
|
||||
/// Python-side OAuth configuration, extracted via FromPyObject.
|
||||
/// Maps to `lancedb.remote.oauth.OAuthConfig` Python dataclass.
|
||||
#[derive(FromPyObject)]
|
||||
pub struct PyOAuthConfig {
|
||||
pub issuer_url: String,
|
||||
pub client_id: String,
|
||||
pub scopes: Vec<String>,
|
||||
pub flow: String,
|
||||
pub client_secret: Option<String>,
|
||||
pub redirect_uri: Option<String>,
|
||||
pub callback_port: Option<u16>,
|
||||
pub managed_identity_client_id: Option<String>,
|
||||
pub token_file: Option<String>,
|
||||
pub refresh_buffer_secs: Option<u64>,
|
||||
}
|
||||
|
||||
impl From<PyOAuthConfig> for OAuthConfig {
|
||||
fn from(py: PyOAuthConfig) -> Self {
|
||||
let flow = match py.flow.as_str() {
|
||||
"client_credentials" => OAuthFlow::ClientCredentials,
|
||||
"authorization_code_pkce" => OAuthFlow::AuthorizationCodePKCE {
|
||||
redirect_uri: py.redirect_uri,
|
||||
callback_port: py.callback_port,
|
||||
},
|
||||
"device_code" => OAuthFlow::DeviceCode,
|
||||
"azure_managed_identity" => OAuthFlow::AzureManagedIdentity {
|
||||
client_id: py.managed_identity_client_id,
|
||||
},
|
||||
"workload_identity" => OAuthFlow::WorkloadIdentity {
|
||||
token_file: py
|
||||
.token_file
|
||||
.expect("token_file is required for workload_identity flow"),
|
||||
},
|
||||
other => panic!("Unknown OAuth flow type: {other}"),
|
||||
};
|
||||
|
||||
OAuthConfig {
|
||||
issuer_url: py.issuer_url,
|
||||
client_id: py.client_id,
|
||||
client_secret: py.client_secret,
|
||||
scopes: py.scopes,
|
||||
flow,
|
||||
refresh_buffer_secs: py.refresh_buffer_secs,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -23,7 +23,7 @@ use lancedb::query::QueryBase;
|
||||
use lancedb::query::QueryExecutionOptions;
|
||||
use lancedb::query::QueryFilter;
|
||||
use lancedb::query::{
|
||||
ExecutableQuery, Query as LanceDbQuery, Select, TakeQuery as LanceDbTakeQuery,
|
||||
ColumnOrdering, ExecutableQuery, Query as LanceDbQuery, Select, TakeQuery as LanceDbTakeQuery,
|
||||
VectorQuery as LanceDbVectorQuery,
|
||||
};
|
||||
use lancedb::table::AnyQuery;
|
||||
@@ -207,6 +207,48 @@ impl<'py> IntoPyObject<'py> for PyLanceDB<FtsQuery> {
|
||||
#[derive(Clone)]
|
||||
pub struct PyQueryVectors(Vec<Arc<dyn Array>>);
|
||||
|
||||
#[derive(Clone, FromPyObject)]
|
||||
#[pyo3(from_item_all)]
|
||||
pub struct PyColumnOrdering {
|
||||
pub column_name: String,
|
||||
pub ascending: bool,
|
||||
pub nulls_first: bool,
|
||||
}
|
||||
|
||||
impl From<ColumnOrdering> for PyColumnOrdering {
|
||||
fn from(ordering: ColumnOrdering) -> Self {
|
||||
Self {
|
||||
column_name: ordering.column_name,
|
||||
ascending: ordering.ascending,
|
||||
nulls_first: ordering.nulls_first,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PyColumnOrdering> for ColumnOrdering {
|
||||
fn from(ordering: PyColumnOrdering) -> Self {
|
||||
Self {
|
||||
column_name: ordering.column_name,
|
||||
ascending: ordering.ascending,
|
||||
nulls_first: ordering.nulls_first,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'py> IntoPyObject<'py> for PyColumnOrdering {
|
||||
type Target = PyDict;
|
||||
type Output = Bound<'py, Self::Target>;
|
||||
type Error = PyErr;
|
||||
|
||||
fn into_pyobject(self, py: pyo3::Python<'py>) -> PyResult<Self::Output> {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("column_name", self.column_name)?;
|
||||
dict.set_item("ascending", self.ascending)?;
|
||||
dict.set_item("nulls_first", self.nulls_first)?;
|
||||
Ok(dict)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'py> IntoPyObject<'py> for PyQueryVectors {
|
||||
type Target = PyList;
|
||||
type Output = Bound<'py, Self::Target>;
|
||||
@@ -246,6 +288,7 @@ pub struct PyQueryRequest {
|
||||
pub bypass_vector_index: Option<bool>,
|
||||
pub postfilter: Option<bool>,
|
||||
pub norm: Option<String>,
|
||||
pub order_by: Option<Vec<PyColumnOrdering>>,
|
||||
}
|
||||
|
||||
impl From<AnyQuery> for PyQueryRequest {
|
||||
@@ -273,6 +316,9 @@ impl From<AnyQuery> for PyQueryRequest {
|
||||
bypass_vector_index: None,
|
||||
postfilter: None,
|
||||
norm: None,
|
||||
order_by: query_request
|
||||
.order_by
|
||||
.map(|order_by| order_by.into_iter().map(PyColumnOrdering::from).collect()),
|
||||
},
|
||||
AnyQuery::VectorQuery(vector_query) => Self {
|
||||
limit: vector_query.base.limit,
|
||||
@@ -297,6 +343,10 @@ impl From<AnyQuery> for PyQueryRequest {
|
||||
bypass_vector_index: Some(!vector_query.use_index),
|
||||
postfilter: Some(!vector_query.base.prefilter),
|
||||
norm: vector_query.base.norm.map(|n| n.to_string()),
|
||||
order_by: vector_query
|
||||
.base
|
||||
.order_by
|
||||
.map(|order_by| order_by.into_iter().map(PyColumnOrdering::from).collect()),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -475,6 +525,13 @@ impl Query {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
|
||||
let ordering =
|
||||
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
|
||||
self.inner = self.inner.clone().order_by(ordering);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[pyo3(signature = ())]
|
||||
pub fn output_schema(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
@@ -647,6 +704,13 @@ impl FTSQuery {
|
||||
self.inner = self.inner.clone().offset(offset as usize);
|
||||
}
|
||||
|
||||
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
|
||||
let ordering =
|
||||
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
|
||||
self.inner = self.inner.clone().order_by(ordering);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn fast_search(&mut self) {
|
||||
self.inner = self.inner.clone().fast_search();
|
||||
}
|
||||
@@ -782,6 +846,13 @@ impl VectorQuery {
|
||||
self.inner = self.inner.clone().offset(offset as usize);
|
||||
}
|
||||
|
||||
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
|
||||
let ordering =
|
||||
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
|
||||
self.inner = self.inner.clone().order_by(ordering);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn fast_search(&mut self) {
|
||||
self.inner = self.inner.clone().fast_search();
|
||||
}
|
||||
@@ -954,6 +1025,12 @@ impl HybridQuery {
|
||||
self.inner_fts.offset(offset);
|
||||
}
|
||||
|
||||
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
|
||||
self.inner_vec.order_by(ordering.clone())?;
|
||||
self.inner_fts.order_by(ordering)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn fast_search(&mut self) {
|
||||
self.inner_vec.fast_search();
|
||||
self.inner_fts.fast_search();
|
||||
|
||||
@@ -171,6 +171,141 @@ impl From<lancedb::table::MergeResult> for MergeResult {
|
||||
}
|
||||
}
|
||||
|
||||
/// Specification selecting Lance's MemWAL LSM-style write path for
|
||||
/// `merge_insert`.
|
||||
///
|
||||
/// Constructed via the `bucket(...)`, `identity(...)`, or `unsharded()`
|
||||
/// classmethods, then optionally chain `with_maintained_indexes(...)` and
|
||||
/// `with_writer_config_defaults(...)`.
|
||||
#[pyclass(from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LsmWriteSpec {
|
||||
inner: lancedb::table::LsmWriteSpec,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl LsmWriteSpec {
|
||||
/// Hash-bucket sharding by the unenforced primary key column.
|
||||
#[staticmethod]
|
||||
pub fn bucket(column: String, num_buckets: u32) -> Self {
|
||||
Self {
|
||||
inner: lancedb::table::LsmWriteSpec::bucket(column, num_buckets),
|
||||
}
|
||||
}
|
||||
|
||||
/// Identity sharding — shard by the raw value of `column`.
|
||||
#[staticmethod]
|
||||
pub fn identity(column: String) -> Self {
|
||||
Self {
|
||||
inner: lancedb::table::LsmWriteSpec::identity(column),
|
||||
}
|
||||
}
|
||||
|
||||
/// No sharding — every `merge_insert` call writes to a single
|
||||
/// MemWAL shard.
|
||||
#[staticmethod]
|
||||
pub fn unsharded() -> Self {
|
||||
Self {
|
||||
inner: lancedb::table::LsmWriteSpec::unsharded(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace the list of indexes the MemWAL should keep up to date as
|
||||
/// rows are appended. Each name must reference an index that
|
||||
/// already exists on the table at the time `set_lsm_write_spec`
|
||||
/// is called.
|
||||
pub fn with_maintained_indexes(&self, indexes: Vec<String>) -> Self {
|
||||
Self {
|
||||
inner: self.inner.clone().with_maintained_indexes(indexes),
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace the default `ShardWriter` configuration recorded in the
|
||||
/// MemWAL index, so every writer starts from the same defaults.
|
||||
pub fn with_writer_config_defaults(&self, defaults: HashMap<String, String>) -> Self {
|
||||
Self {
|
||||
inner: self.inner.clone().with_writer_config_defaults(defaults),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn __repr__(&self) -> String {
|
||||
match &self.inner {
|
||||
lancedb::table::LsmWriteSpec::Bucket {
|
||||
column,
|
||||
num_buckets,
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => format!(
|
||||
"LsmWriteSpec.bucket(column={:?}, num_buckets={}, maintained_indexes={:?}, writer_config_defaults={:?})",
|
||||
column, num_buckets, maintained_indexes, writer_config_defaults,
|
||||
),
|
||||
lancedb::table::LsmWriteSpec::Identity {
|
||||
column,
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => format!(
|
||||
"LsmWriteSpec.identity(column={:?}, maintained_indexes={:?}, writer_config_defaults={:?})",
|
||||
column, maintained_indexes, writer_config_defaults,
|
||||
),
|
||||
lancedb::table::LsmWriteSpec::Unsharded {
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => format!(
|
||||
"LsmWriteSpec.unsharded(maintained_indexes={:?}, writer_config_defaults={:?})",
|
||||
maintained_indexes, writer_config_defaults,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Discriminator string identifying the variant ("bucket", "identity",
|
||||
/// or "unsharded").
|
||||
#[getter]
|
||||
pub fn spec_type(&self) -> &'static str {
|
||||
match &self.inner {
|
||||
lancedb::table::LsmWriteSpec::Bucket { .. } => "bucket",
|
||||
lancedb::table::LsmWriteSpec::Identity { .. } => "identity",
|
||||
lancedb::table::LsmWriteSpec::Unsharded { .. } => "unsharded",
|
||||
}
|
||||
}
|
||||
|
||||
/// Bucket and identity variants: the sharding column. `None` for unsharded.
|
||||
#[getter]
|
||||
pub fn column(&self) -> Option<String> {
|
||||
match &self.inner {
|
||||
lancedb::table::LsmWriteSpec::Bucket { column, .. }
|
||||
| lancedb::table::LsmWriteSpec::Identity { column, .. } => Some(column.clone()),
|
||||
lancedb::table::LsmWriteSpec::Unsharded { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Bucket variant only: the number of buckets.
|
||||
#[getter]
|
||||
pub fn num_buckets(&self) -> Option<u32> {
|
||||
match &self.inner {
|
||||
lancedb::table::LsmWriteSpec::Bucket { num_buckets, .. } => Some(*num_buckets),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Names of indexes the MemWAL should keep up to date during writes.
|
||||
#[getter]
|
||||
pub fn maintained_indexes(&self) -> Vec<String> {
|
||||
self.inner.maintained_indexes().to_vec()
|
||||
}
|
||||
|
||||
/// Default `ShardWriter` configuration recorded by this spec.
|
||||
#[getter]
|
||||
pub fn writer_config_defaults(&self) -> HashMap<String, String> {
|
||||
self.inner.writer_config_defaults().clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LsmWriteSpec> for lancedb::table::LsmWriteSpec {
|
||||
fn from(spec: LsmWriteSpec) -> Self {
|
||||
spec.inner
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AddColumnsResult {
|
||||
@@ -805,6 +940,37 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_unenforced_primary_key<'a>(
|
||||
self_: PyRef<'a, Self>,
|
||||
columns: Vec<String>,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner
|
||||
.set_unenforced_primary_key(columns)
|
||||
.await
|
||||
.infer_error()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_lsm_write_spec<'a>(
|
||||
self_: PyRef<'a, Self>,
|
||||
spec: LsmWriteSpec,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
let native_spec = lancedb::table::LsmWriteSpec::from(spec);
|
||||
future_into_py(self_.py(), async move {
|
||||
inner.set_lsm_write_spec(native_spec).await.infer_error()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn unset_lsm_write_spec(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner.unset_lsm_write_spec().await.infer_error()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb"
|
||||
version = "0.28.0-beta.11"
|
||||
version = "0.29.1-beta.0"
|
||||
edition.workspace = true
|
||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
@@ -75,6 +75,11 @@ reqwest = { version = "0.12.0", default-features = false, features = [
|
||||
"stream",
|
||||
], optional = true }
|
||||
http = { version = "1", optional = true } # Matching what is in reqwest
|
||||
# OAuth dependencies (used by remote feature)
|
||||
sha2 = { version = "0.10", optional = true }
|
||||
base64 = { version = "0.22", optional = true }
|
||||
urlencoding = { version = "2", optional = true }
|
||||
open = { version = "5", optional = true }
|
||||
uuid = { version = "1.7.0", features = ["v4"] }
|
||||
polars-arrow = { version = ">=0.37,<0.40.0", optional = true }
|
||||
polars = { version = ">=0.37,<0.40.0", optional = true }
|
||||
@@ -128,7 +133,7 @@ huggingface = [
|
||||
"lance-namespace-impls/dir-huggingface",
|
||||
]
|
||||
dynamodb = ["lance/dynamodb", "aws"]
|
||||
remote = ["dep:reqwest", "dep:http", "lance-namespace-impls/rest", "lance-namespace-impls/rest-adapter"]
|
||||
remote = ["dep:reqwest", "dep:http", "dep:sha2", "dep:base64", "dep:urlencoding", "dep:open", "lance-namespace-impls/rest", "lance-namespace-impls/rest-adapter"]
|
||||
fp16kernels = ["lance-linalg/fp16kernels"]
|
||||
s3-test = []
|
||||
bedrock = ["dep:aws-sdk-bedrockruntime"]
|
||||
|
||||
@@ -622,6 +622,8 @@ pub struct ConnectRequest {
|
||||
pub struct ConnectBuilder {
|
||||
request: ConnectRequest,
|
||||
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
|
||||
#[cfg(feature = "remote")]
|
||||
oauth_config: Option<crate::remote::oauth::OAuthConfig>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "remote")]
|
||||
@@ -643,6 +645,8 @@ impl ConnectBuilder {
|
||||
session: None,
|
||||
},
|
||||
embedding_registry: None,
|
||||
#[cfg(feature = "remote")]
|
||||
oauth_config: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -731,6 +735,19 @@ impl ConnectBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure OAuth authentication for LanceDB Cloud/Enterprise.
|
||||
///
|
||||
/// This creates an [`OAuthHeaderProvider`](crate::remote::OAuthHeaderProvider)
|
||||
/// from the given config and sets it as the header provider, replacing any
|
||||
/// previously configured header provider or API key.
|
||||
///
|
||||
/// Token acquisition and refresh are handled entirely in Rust.
|
||||
#[cfg(feature = "remote")]
|
||||
pub fn oauth_config(mut self, config: crate::remote::oauth::OAuthConfig) -> Self {
|
||||
self.oauth_config = Some(config);
|
||||
self
|
||||
}
|
||||
|
||||
/// Provide a custom [`EmbeddingRegistry`] to use for this connection.
|
||||
pub fn embedding_registry(mut self, registry: Arc<dyn EmbeddingRegistry>) -> Self {
|
||||
self.embedding_registry = Some(registry);
|
||||
@@ -874,9 +891,29 @@ impl ConnectBuilder {
|
||||
let region = options.region.ok_or_else(|| Error::InvalidInput {
|
||||
message: "A region is required when connecting to LanceDb Cloud".to_string(),
|
||||
})?;
|
||||
let api_key = options.api_key.ok_or_else(|| Error::InvalidInput {
|
||||
message: "An api_key is required when connecting to LanceDb Cloud".to_string(),
|
||||
})?;
|
||||
|
||||
// When OAuth is configured, api_key is not required
|
||||
let api_key = match (&self.oauth_config, &options.api_key) {
|
||||
(Some(_), None) => String::new(),
|
||||
(Some(_), Some(key)) => key.clone(),
|
||||
(None, Some(key)) => key.clone(),
|
||||
(None, None) => {
|
||||
return Err(Error::InvalidInput {
|
||||
message:
|
||||
"An api_key or oauth_config is required when connecting to LanceDb Cloud"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
let mut client_config = self.request.client_config;
|
||||
|
||||
// Apply OAuth header provider if configured
|
||||
if let Some(oauth_config) = self.oauth_config {
|
||||
let provider = crate::remote::oauth::OAuthHeaderProvider::new(oauth_config)?;
|
||||
client_config.header_provider =
|
||||
Some(Arc::new(provider) as Arc<dyn crate::remote::client::HeaderProvider>);
|
||||
}
|
||||
|
||||
let storage_options = StorageOptions(options.storage_options.clone());
|
||||
let internal = Arc::new(crate::remote::db::RemoteDatabase::try_new(
|
||||
@@ -884,7 +921,7 @@ impl ConnectBuilder {
|
||||
&api_key,
|
||||
®ion,
|
||||
options.host_override,
|
||||
self.request.client_config,
|
||||
client_config,
|
||||
storage_options.into(),
|
||||
)?);
|
||||
Ok(Connection {
|
||||
|
||||
@@ -450,6 +450,10 @@ impl PermutationReader {
|
||||
}
|
||||
|
||||
pub async fn take_offsets(&self, offsets: &[u64], selection: Select) -> Result<RecordBatch> {
|
||||
if offsets.is_empty() {
|
||||
return Ok(RecordBatch::new_empty(self.output_schema(selection).await?));
|
||||
}
|
||||
|
||||
if let Some(permutation_table) = &self.permutation_table {
|
||||
let offset_map = self.get_offset_map(permutation_table).await?;
|
||||
let row_ids = offsets
|
||||
@@ -955,4 +959,62 @@ mod tests {
|
||||
.to_vec();
|
||||
assert_eq!(idx_values, &all_idx_values[4997..5000]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_take_offsets_empty_identity_reader() {
|
||||
let base_table = lance_datagen::gen_batch()
|
||||
.col("idx", lance_datagen::array::step::<Int32Type>())
|
||||
.into_mem_table("tbl", RowCount::from(10), BatchCount::from(1))
|
||||
.await;
|
||||
|
||||
let reader = PermutationReader::identity(base_table.base_table().clone()).await;
|
||||
|
||||
let batch = reader.take_offsets(&[], Select::All).await.unwrap();
|
||||
|
||||
assert_eq!(batch.num_rows(), 0);
|
||||
assert_eq!(batch.num_columns(), 1);
|
||||
assert_eq!(batch.schema().field(0).name(), "idx");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_take_offsets_empty_with_permutation_table() {
|
||||
let (base_table, row_ids_table, _) = setup_permutation_tables(5).await;
|
||||
|
||||
let reader = PermutationReader::try_from_tables(
|
||||
base_table.base_table().clone(),
|
||||
row_ids_table.base_table().clone(),
|
||||
0,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = reader.take_offsets(&[], Select::All).await.unwrap();
|
||||
|
||||
assert_eq!(batch.num_rows(), 0);
|
||||
assert_eq!(batch.schema().fields().len(), 2);
|
||||
assert_eq!(batch.schema().field(0).name(), "idx");
|
||||
assert_eq!(batch.schema().field(1).name(), "other_col");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_take_offsets_empty_with_column_selection() {
|
||||
let (base_table, row_ids_table, _) = setup_permutation_tables(5).await;
|
||||
|
||||
let reader = PermutationReader::try_from_tables(
|
||||
base_table.base_table().clone(),
|
||||
row_ids_table.base_table().clone(),
|
||||
0,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = reader
|
||||
.take_offsets(&[], Select::Columns(vec!["idx".to_string()]))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(batch.num_rows(), 0);
|
||||
assert_eq!(batch.num_columns(), 1);
|
||||
assert_eq!(batch.schema().field(0).name(), "idx");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ use datafusion_expr::Expr;
|
||||
use datafusion_physical_plan::ExecutionPlan;
|
||||
use futures::{FutureExt, TryFutureExt, TryStreamExt, stream, try_join};
|
||||
use half::f16;
|
||||
/// Re-export Lance ColumnOrdering type for use in query ordering
|
||||
pub use lance::dataset::scanner::ColumnOrdering;
|
||||
use lance::dataset::{ROW_ID, scanner::DatasetRecordBatchStream};
|
||||
use lance_arrow::RecordBatchExt;
|
||||
use lance_datafusion::exec::execute_plan;
|
||||
@@ -510,6 +512,11 @@ pub trait QueryBase {
|
||||
/// the scores are converted to ranks and then normalized. If "Score", the
|
||||
/// scores are normalized directly.
|
||||
fn norm(self, norm: NormalizeMethod) -> Self;
|
||||
|
||||
/// Sort the results by the specified column(s).
|
||||
///
|
||||
/// This allows ordering query results by one or more columns in either ascending or descending order.
|
||||
fn order_by(self, ordering: Option<Vec<ColumnOrdering>>) -> Self;
|
||||
}
|
||||
|
||||
pub trait HasQuery {
|
||||
@@ -574,6 +581,11 @@ impl<T: HasQuery> QueryBase for T {
|
||||
self.mut_query().norm = Some(norm);
|
||||
self
|
||||
}
|
||||
|
||||
fn order_by(mut self, ordering: Option<Vec<ColumnOrdering>>) -> Self {
|
||||
self.mut_query().order_by = ordering;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Options for controlling the execution of a query
|
||||
@@ -750,6 +762,11 @@ pub struct QueryRequest {
|
||||
///
|
||||
/// By default, this is false (scoring columns are auto-projected for backward compatibility).
|
||||
pub disable_scoring_autoprojection: bool,
|
||||
|
||||
/// Sort the results by the specified column(s).
|
||||
///
|
||||
/// This allows ordering query results by one or more columns in either ascending or descending order.
|
||||
pub order_by: Option<Vec<ColumnOrdering>>,
|
||||
}
|
||||
|
||||
impl Default for QueryRequest {
|
||||
@@ -766,6 +783,7 @@ impl Default for QueryRequest {
|
||||
reranker: None,
|
||||
norm: None,
|
||||
disable_scoring_autoprojection: false,
|
||||
order_by: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
pub(crate) mod client;
|
||||
pub(crate) mod db;
|
||||
pub mod oauth;
|
||||
mod retry;
|
||||
pub(crate) mod table;
|
||||
pub(crate) mod util;
|
||||
@@ -20,3 +21,4 @@ const JSON_CONTENT_TYPE: &str = "application/json";
|
||||
|
||||
pub use client::{ClientConfig, HeaderProvider, RetryConfig, TimeoutConfig, TlsConfig};
|
||||
pub use db::{RemoteDatabaseOptions, RemoteDatabaseOptionsBuilder};
|
||||
pub use oauth::{OAuthConfig, OAuthFlow, OAuthHeaderProvider};
|
||||
|
||||
906
rust/lancedb/src/remote/oauth.rs
Normal file
906
rust/lancedb/src/remote/oauth.rs
Normal file
@@ -0,0 +1,906 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use log::{debug, info, warn};
|
||||
use reqwest::Client;
|
||||
use serde::Deserialize;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::remote::client::HeaderProvider;
|
||||
|
||||
const DEFAULT_REFRESH_BUFFER_SECS: u64 = 300;
|
||||
const DEFAULT_CALLBACK_PORT: u16 = 8400;
|
||||
const AZURE_IMDS_ENDPOINT: &str = "http://169.254.169.254/metadata/identity/oauth2/token";
|
||||
const AZURE_IMDS_API_VERSION: &str = "2018-02-01";
|
||||
|
||||
/// OAuth authentication flow configuration.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum OAuthFlow {
|
||||
/// Client Credentials grant (service-to-service / M2M).
|
||||
/// Requires `client_secret` in [`OAuthConfig`].
|
||||
ClientCredentials,
|
||||
|
||||
/// Authorization Code with PKCE (interactive browser-based auth).
|
||||
AuthorizationCodePKCE {
|
||||
/// Redirect URI (default: `http://localhost:{callback_port}/callback`)
|
||||
redirect_uri: Option<String>,
|
||||
/// Port for the local HTTP callback server (default: 8400)
|
||||
callback_port: Option<u16>,
|
||||
},
|
||||
|
||||
/// Device Code grant (CLI / headless environments).
|
||||
/// Displays a verification URL and user code for out-of-band authentication.
|
||||
DeviceCode,
|
||||
|
||||
/// Azure Managed Identity via IMDS.
|
||||
/// Works on Azure VMs, AKS, App Service, and Azure Functions.
|
||||
AzureManagedIdentity {
|
||||
/// Client ID for user-assigned managed identity.
|
||||
/// Omit for system-assigned managed identity.
|
||||
client_id: Option<String>,
|
||||
},
|
||||
|
||||
/// Workload Identity Federation.
|
||||
/// Exchanges a platform token (K8s service account, GitHub OIDC) for an IdP token.
|
||||
WorkloadIdentity {
|
||||
/// Path to the federated token file
|
||||
/// (e.g. `AZURE_FEDERATED_TOKEN_FILE`).
|
||||
token_file: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// OAuth configuration for LanceDB authentication.
|
||||
///
|
||||
/// All token acquisition and refresh is handled in the Rust layer.
|
||||
/// Python and TypeScript bindings expose this as a plain config object.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OAuthConfig {
|
||||
/// OIDC issuer URL or OAuth authority URL.
|
||||
/// For Azure: `https://login.microsoftonline.com/{tenant_id}/v2.0`
|
||||
pub issuer_url: String,
|
||||
|
||||
/// Application / Client ID.
|
||||
pub client_id: String,
|
||||
|
||||
/// Client secret (required for `ClientCredentials`, optional for others).
|
||||
pub client_secret: Option<String>,
|
||||
|
||||
/// OAuth scopes to request.
|
||||
/// For Azure: `["api://{app_id}/.default"]`
|
||||
pub scopes: Vec<String>,
|
||||
|
||||
/// Authentication flow to use.
|
||||
pub flow: OAuthFlow,
|
||||
|
||||
/// Seconds before token expiry to trigger proactive refresh (default: 300).
|
||||
pub refresh_buffer_secs: Option<u64>,
|
||||
}
|
||||
|
||||
// -- OIDC Discovery --
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct OidcDiscovery {
|
||||
token_endpoint: String,
|
||||
authorization_endpoint: Option<String>,
|
||||
device_authorization_endpoint: Option<String>,
|
||||
}
|
||||
|
||||
// -- Token Response --
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct TokenResponse {
|
||||
access_token: String,
|
||||
#[serde(default)]
|
||||
refresh_token: Option<String>,
|
||||
/// Token lifetime in seconds.
|
||||
/// Some providers (Azure IMDS) return this as a string, so we accept both.
|
||||
#[serde(default, deserialize_with = "deserialize_optional_u64_or_string")]
|
||||
expires_in: Option<u64>,
|
||||
#[serde(default)]
|
||||
#[allow(dead_code)]
|
||||
token_type: Option<String>,
|
||||
}
|
||||
|
||||
fn deserialize_optional_u64_or_string<'de, D>(
|
||||
deserializer: D,
|
||||
) -> std::result::Result<Option<u64>, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
use serde::de;
|
||||
|
||||
struct U64OrString;
|
||||
impl<'de> de::Visitor<'de> for U64OrString {
|
||||
type Value = Option<u64>;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
formatter.write_str("a u64, a numeric string, or null")
|
||||
}
|
||||
|
||||
fn visit_u64<E: de::Error>(self, v: u64) -> std::result::Result<Self::Value, E> {
|
||||
Ok(Some(v))
|
||||
}
|
||||
|
||||
fn visit_i64<E: de::Error>(self, v: i64) -> std::result::Result<Self::Value, E> {
|
||||
Ok(Some(v as u64))
|
||||
}
|
||||
|
||||
fn visit_str<E: de::Error>(self, v: &str) -> std::result::Result<Self::Value, E> {
|
||||
v.parse::<u64>().map(Some).map_err(de::Error::custom)
|
||||
}
|
||||
|
||||
fn visit_none<E: de::Error>(self) -> std::result::Result<Self::Value, E> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn visit_unit<E: de::Error>(self) -> std::result::Result<Self::Value, E> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
deserializer.deserialize_any(U64OrString)
|
||||
}
|
||||
|
||||
// -- Device Code Response --
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct DeviceCodeResponse {
|
||||
device_code: String,
|
||||
user_code: String,
|
||||
verification_uri: String,
|
||||
#[serde(default)]
|
||||
verification_uri_complete: Option<String>,
|
||||
expires_in: u64,
|
||||
interval: Option<u64>,
|
||||
}
|
||||
|
||||
// -- Internal Token State --
|
||||
|
||||
struct TokenState {
|
||||
access_token: Option<String>,
|
||||
refresh_token: Option<String>,
|
||||
expires_at: Option<Instant>,
|
||||
}
|
||||
|
||||
impl TokenState {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
access_token: None,
|
||||
refresh_token: None,
|
||||
expires_at: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_expired(&self, buffer: Duration) -> bool {
|
||||
match (self.access_token.as_ref(), self.expires_at) {
|
||||
(Some(_), Some(expires_at)) => Instant::now() + buffer >= expires_at,
|
||||
(None, _) => true,
|
||||
(Some(_), None) => false, // no expiry info, assume valid
|
||||
}
|
||||
}
|
||||
|
||||
fn update(&mut self, resp: &TokenResponse) {
|
||||
self.access_token = Some(resp.access_token.clone());
|
||||
if resp.refresh_token.is_some() {
|
||||
self.refresh_token = resp.refresh_token.clone();
|
||||
}
|
||||
self.expires_at = resp
|
||||
.expires_in
|
||||
.map(|secs| Instant::now() + Duration::from_secs(secs));
|
||||
}
|
||||
}
|
||||
|
||||
/// OAuth header provider that manages the full token lifecycle.
|
||||
///
|
||||
/// Implements [`HeaderProvider`] to inject `Authorization: Bearer <token>`
|
||||
/// headers into every LanceDB request, with automatic token refresh.
|
||||
pub struct OAuthHeaderProvider {
|
||||
config: OAuthConfig,
|
||||
http_client: Client,
|
||||
token_state: Arc<RwLock<TokenState>>,
|
||||
/// Cached OIDC discovery document
|
||||
discovery: Arc<RwLock<Option<OidcDiscovery>>>,
|
||||
refresh_buffer: Duration,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for OAuthHeaderProvider {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("OAuthHeaderProvider")
|
||||
.field("issuer_url", &self.config.issuer_url)
|
||||
.field("client_id", &self.config.client_id)
|
||||
.field("flow", &self.config.flow)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl OAuthHeaderProvider {
|
||||
/// Create a new OAuth header provider from configuration.
|
||||
pub fn new(config: OAuthConfig) -> Result<Self> {
|
||||
// Validate config upfront
|
||||
if matches!(config.flow, OAuthFlow::ClientCredentials) && config.client_secret.is_none() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "client_secret is required for ClientCredentials flow".to_string(),
|
||||
});
|
||||
}
|
||||
if config.scopes.is_empty() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "At least one OAuth scope is required".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let http_client = Client::builder()
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to create HTTP client for OAuth: {e}"),
|
||||
})?;
|
||||
|
||||
let refresh_buffer = Duration::from_secs(
|
||||
config
|
||||
.refresh_buffer_secs
|
||||
.unwrap_or(DEFAULT_REFRESH_BUFFER_SECS),
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
http_client,
|
||||
token_state: Arc::new(RwLock::new(TokenState::new())),
|
||||
discovery: Arc::new(RwLock::new(None)),
|
||||
refresh_buffer,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get a valid access token, refreshing if necessary.
|
||||
async fn get_valid_token(&self) -> Result<String> {
|
||||
// Fast path: check if current token is still valid
|
||||
{
|
||||
let state = self.token_state.read().await;
|
||||
if !state.is_expired(self.refresh_buffer)
|
||||
&& let Some(ref token) = state.access_token
|
||||
{
|
||||
return Ok(token.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path: acquire or refresh token
|
||||
let mut state = self.token_state.write().await;
|
||||
|
||||
// Double-check after acquiring write lock
|
||||
if !state.is_expired(self.refresh_buffer)
|
||||
&& let Some(ref token) = state.access_token
|
||||
{
|
||||
return Ok(token.clone());
|
||||
}
|
||||
|
||||
let uses_refresh_token = !matches!(
|
||||
self.config.flow,
|
||||
OAuthFlow::ClientCredentials
|
||||
| OAuthFlow::AzureManagedIdentity { .. }
|
||||
| OAuthFlow::WorkloadIdentity { .. }
|
||||
);
|
||||
|
||||
let resp = if let Some(ref refresh_token) = state.refresh_token
|
||||
&& uses_refresh_token
|
||||
{
|
||||
debug!("Refreshing OAuth token using refresh_token");
|
||||
self.refresh_with_token(refresh_token).await?
|
||||
} else {
|
||||
debug!("Acquiring new OAuth token via {:?} flow", self.config.flow);
|
||||
self.acquire_token().await?
|
||||
};
|
||||
|
||||
state.update(&resp);
|
||||
Ok(resp.access_token)
|
||||
}
|
||||
|
||||
/// Acquire a new token using the configured flow.
|
||||
async fn acquire_token(&self) -> Result<TokenResponse> {
|
||||
match &self.config.flow {
|
||||
OAuthFlow::ClientCredentials => self.acquire_client_credentials().await,
|
||||
OAuthFlow::AuthorizationCodePKCE {
|
||||
redirect_uri,
|
||||
callback_port,
|
||||
} => {
|
||||
self.acquire_authorization_code_pkce(
|
||||
redirect_uri.as_deref(),
|
||||
callback_port.unwrap_or(DEFAULT_CALLBACK_PORT),
|
||||
)
|
||||
.await
|
||||
}
|
||||
OAuthFlow::DeviceCode => self.acquire_device_code().await,
|
||||
OAuthFlow::AzureManagedIdentity { client_id } => {
|
||||
self.acquire_managed_identity(client_id.as_deref()).await
|
||||
}
|
||||
OAuthFlow::WorkloadIdentity { token_file } => {
|
||||
self.acquire_workload_identity(token_file).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// -- OIDC Discovery --
|
||||
|
||||
async fn get_discovery(&self) -> Result<OidcDiscovery> {
|
||||
{
|
||||
let cached = self.discovery.read().await;
|
||||
if let Some(ref disc) = *cached {
|
||||
return Ok(OidcDiscovery {
|
||||
token_endpoint: disc.token_endpoint.clone(),
|
||||
authorization_endpoint: disc.authorization_endpoint.clone(),
|
||||
device_authorization_endpoint: disc.device_authorization_endpoint.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut cache = self.discovery.write().await;
|
||||
// Double-check
|
||||
if let Some(ref disc) = *cache {
|
||||
return Ok(OidcDiscovery {
|
||||
token_endpoint: disc.token_endpoint.clone(),
|
||||
authorization_endpoint: disc.authorization_endpoint.clone(),
|
||||
device_authorization_endpoint: disc.device_authorization_endpoint.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
let discovery_url = format!(
|
||||
"{}/.well-known/openid-configuration",
|
||||
self.config.issuer_url.trim_end_matches('/')
|
||||
);
|
||||
|
||||
debug!("Fetching OIDC discovery from {}", discovery_url);
|
||||
|
||||
let resp = self
|
||||
.http_client
|
||||
.get(&discovery_url)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to fetch OIDC discovery document: {e}"),
|
||||
})?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
return Err(Error::Runtime {
|
||||
message: format!(
|
||||
"OIDC discovery failed with status {}: {}",
|
||||
resp.status(),
|
||||
resp.text().await.unwrap_or_default()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
let disc: OidcDiscovery = resp.json().await.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to parse OIDC discovery document: {e}"),
|
||||
})?;
|
||||
|
||||
let result = OidcDiscovery {
|
||||
token_endpoint: disc.token_endpoint.clone(),
|
||||
authorization_endpoint: disc.authorization_endpoint.clone(),
|
||||
device_authorization_endpoint: disc.device_authorization_endpoint.clone(),
|
||||
};
|
||||
|
||||
*cache = Some(disc);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn get_token_endpoint_from_issuer(&self) -> String {
|
||||
// Derive Azure v2.0 token endpoint from issuer URL
|
||||
// issuer: https://login.microsoftonline.com/{tenant}/v2.0
|
||||
// token: https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token
|
||||
let base = self.config.issuer_url.trim_end_matches("/v2.0");
|
||||
format!("{base}/oauth2/v2.0/token")
|
||||
}
|
||||
|
||||
async fn get_token_endpoint(&self) -> Result<String> {
|
||||
match self.get_discovery().await {
|
||||
Ok(disc) => Ok(disc.token_endpoint),
|
||||
Err(e) => {
|
||||
warn!("OIDC discovery failed, falling back to derived endpoint: {e}");
|
||||
Ok(self.get_token_endpoint_from_issuer())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn scopes_string(&self) -> String {
|
||||
self.config.scopes.join(" ")
|
||||
}
|
||||
|
||||
// -- Client Credentials Flow --
|
||||
|
||||
async fn acquire_client_credentials(&self) -> Result<TokenResponse> {
|
||||
let client_secret = self
|
||||
.config
|
||||
.client_secret
|
||||
.as_ref()
|
||||
.ok_or(Error::InvalidInput {
|
||||
message: "client_secret is required for ClientCredentials flow".to_string(),
|
||||
})?;
|
||||
|
||||
let token_endpoint = self.get_token_endpoint().await?;
|
||||
|
||||
let params = [
|
||||
("grant_type", "client_credentials"),
|
||||
("client_id", &self.config.client_id),
|
||||
("client_secret", client_secret),
|
||||
("scope", &self.scopes_string()),
|
||||
];
|
||||
|
||||
self.post_token_request(&token_endpoint, ¶ms).await
|
||||
}
|
||||
|
||||
// -- Authorization Code + PKCE Flow --
|
||||
|
||||
async fn acquire_authorization_code_pkce(
|
||||
&self,
|
||||
redirect_uri: Option<&str>,
|
||||
callback_port: u16,
|
||||
) -> Result<TokenResponse> {
|
||||
use rand::Rng;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
let discovery = self.get_discovery().await?;
|
||||
let auth_endpoint = discovery.authorization_endpoint.ok_or(Error::Runtime {
|
||||
message: "OIDC discovery did not provide authorization_endpoint".to_string(),
|
||||
})?;
|
||||
|
||||
let default_redirect = format!("http://localhost:{callback_port}/callback");
|
||||
let redirect = redirect_uri.unwrap_or(&default_redirect);
|
||||
|
||||
// Generate PKCE code verifier and challenge (S256)
|
||||
const PKCE_CHARSET: &[u8] =
|
||||
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~";
|
||||
let code_verifier: String = {
|
||||
let mut rng = rand::rng();
|
||||
(0..128)
|
||||
.map(|_| {
|
||||
let idx = rng.random_range(0..PKCE_CHARSET.len());
|
||||
PKCE_CHARSET[idx] as char
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
let code_challenge = {
|
||||
use sha2::{Digest, Sha256};
|
||||
let hash = Sha256::digest(code_verifier.as_bytes());
|
||||
base64_url_encode(&hash)
|
||||
};
|
||||
|
||||
let state: String = {
|
||||
let mut rng = rand::rng();
|
||||
(0..32)
|
||||
.map(|_| {
|
||||
let idx = rng.random_range(0..16u8);
|
||||
b"0123456789abcdef"[idx as usize] as char
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
// Build authorization URL
|
||||
let auth_url = format!(
|
||||
"{auth_endpoint}?response_type=code&client_id={}&redirect_uri={}&scope={}&code_challenge={}&code_challenge_method=S256&state={state}",
|
||||
urlencoding::encode(&self.config.client_id),
|
||||
urlencoding::encode(redirect),
|
||||
urlencoding::encode(&self.scopes_string()),
|
||||
urlencoding::encode(&code_challenge),
|
||||
);
|
||||
|
||||
info!("Opening browser for OAuth login...");
|
||||
info!("If the browser doesn't open, visit: {auth_url}");
|
||||
|
||||
// Try to open browser
|
||||
let _ = open::that(&auth_url);
|
||||
|
||||
// Start local callback server
|
||||
let listener = TcpListener::bind(format!("127.0.0.1:{callback_port}"))
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to bind callback server on port {callback_port}: {e}"),
|
||||
})?;
|
||||
|
||||
info!("Waiting for OAuth callback on port {callback_port}...");
|
||||
|
||||
let (mut stream, _) = listener.accept().await.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to accept callback connection: {e}"),
|
||||
})?;
|
||||
|
||||
// Read the HTTP request
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to read callback request: {e}"),
|
||||
})?;
|
||||
let request_str = String::from_utf8_lossy(&buf[..n]);
|
||||
|
||||
// Extract authorization code from query params
|
||||
let code = extract_query_param(&request_str, "code").ok_or(Error::Runtime {
|
||||
message: "No authorization code in callback".to_string(),
|
||||
})?;
|
||||
|
||||
let returned_state = extract_query_param(&request_str, "state");
|
||||
if returned_state.as_deref() != Some(&state) {
|
||||
return Err(Error::Runtime {
|
||||
message: "OAuth state mismatch — possible CSRF attack".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Send success response to browser
|
||||
let response = "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n<html><body><h2>Authentication successful!</h2><p>You can close this window.</p></body></html>";
|
||||
let _ = stream.write_all(response.as_bytes()).await;
|
||||
|
||||
// Exchange code for tokens
|
||||
let token_endpoint = self.get_token_endpoint().await?;
|
||||
let mut params = vec![
|
||||
("grant_type", "authorization_code"),
|
||||
("client_id", self.config.client_id.as_str()),
|
||||
("code", &code),
|
||||
("redirect_uri", redirect),
|
||||
("code_verifier", &code_verifier),
|
||||
];
|
||||
if let Some(ref secret) = self.config.client_secret {
|
||||
params.push(("client_secret", secret));
|
||||
}
|
||||
|
||||
self.post_token_request(&token_endpoint, ¶ms).await
|
||||
}
|
||||
|
||||
// -- Device Code Flow --
|
||||
|
||||
async fn acquire_device_code(&self) -> Result<TokenResponse> {
|
||||
let discovery = self.get_discovery().await?;
|
||||
let device_endpoint = discovery
|
||||
.device_authorization_endpoint
|
||||
.ok_or(Error::Runtime {
|
||||
message: "OIDC discovery did not provide device_authorization_endpoint".to_string(),
|
||||
})?;
|
||||
|
||||
let params = [
|
||||
("client_id", self.config.client_id.as_str()),
|
||||
("scope", &self.scopes_string()),
|
||||
];
|
||||
|
||||
let resp = self
|
||||
.http_client
|
||||
.post(&device_endpoint)
|
||||
.form(¶ms)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Device code request failed: {e}"),
|
||||
})?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
return Err(Error::Runtime {
|
||||
message: format!(
|
||||
"Device code request failed with status {}: {}",
|
||||
resp.status(),
|
||||
resp.text().await.unwrap_or_default()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
let device_resp: DeviceCodeResponse = resp.json().await.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to parse device code response: {e}"),
|
||||
})?;
|
||||
|
||||
// Display instructions to user
|
||||
info!(
|
||||
"To sign in, visit {} and enter code: {}",
|
||||
device_resp.verification_uri, device_resp.user_code
|
||||
);
|
||||
if let Some(ref uri) = device_resp.verification_uri_complete {
|
||||
info!("Or visit: {uri}");
|
||||
}
|
||||
|
||||
// Poll token endpoint
|
||||
let token_endpoint = self.get_token_endpoint().await?;
|
||||
let poll_interval = Duration::from_secs(device_resp.interval.unwrap_or(5));
|
||||
let deadline = Instant::now() + Duration::from_secs(device_resp.expires_in);
|
||||
|
||||
loop {
|
||||
if Instant::now() >= deadline {
|
||||
return Err(Error::Runtime {
|
||||
message: "Device code flow timed out waiting for user authentication"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
|
||||
let poll_params = [
|
||||
("grant_type", "urn:ietf:params:oauth:grant-type:device_code"),
|
||||
("client_id", self.config.client_id.as_str()),
|
||||
("device_code", &device_resp.device_code),
|
||||
];
|
||||
|
||||
let poll_resp = self
|
||||
.http_client
|
||||
.post(&token_endpoint)
|
||||
.form(&poll_params)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Device code poll failed: {e}"),
|
||||
})?;
|
||||
|
||||
if poll_resp.status().is_success() {
|
||||
return poll_resp.json().await.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to parse token response: {e}"),
|
||||
});
|
||||
}
|
||||
|
||||
// Check for pending / slow_down errors
|
||||
let body = poll_resp.text().await.unwrap_or_default();
|
||||
if body.contains("authorization_pending") {
|
||||
continue;
|
||||
}
|
||||
if body.contains("slow_down") {
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
return Err(Error::Runtime {
|
||||
message: format!("Device code poll failed: {body}"),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// -- Azure Managed Identity Flow --
|
||||
|
||||
async fn acquire_managed_identity(&self, mi_client_id: Option<&str>) -> Result<TokenResponse> {
|
||||
let resource = self.scopes_string().replace("/.default", "");
|
||||
|
||||
let mut url = format!(
|
||||
"{AZURE_IMDS_ENDPOINT}?api-version={AZURE_IMDS_API_VERSION}&resource={}",
|
||||
urlencoding::encode(&resource),
|
||||
);
|
||||
if let Some(cid) = mi_client_id {
|
||||
url.push_str(&format!("&client_id={}", urlencoding::encode(cid)));
|
||||
}
|
||||
|
||||
let resp = self
|
||||
.http_client
|
||||
.get(&url)
|
||||
.header("Metadata", "true")
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Azure IMDS request failed: {e}"),
|
||||
})?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
return Err(Error::Runtime {
|
||||
message: format!(
|
||||
"Azure IMDS returned status {}: {}",
|
||||
resp.status(),
|
||||
resp.text().await.unwrap_or_default()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
resp.json().await.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to parse IMDS token response: {e}"),
|
||||
})
|
||||
}
|
||||
|
||||
// -- Workload Identity Federation Flow --
|
||||
|
||||
async fn acquire_workload_identity(&self, token_file: &str) -> Result<TokenResponse> {
|
||||
let federated_token =
|
||||
tokio::fs::read_to_string(token_file)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to read federated token file '{token_file}': {e}"),
|
||||
})?;
|
||||
|
||||
let token_endpoint = self.get_token_endpoint().await?;
|
||||
|
||||
let params = [
|
||||
("grant_type", "client_credentials"),
|
||||
("client_id", self.config.client_id.as_str()),
|
||||
(
|
||||
"client_assertion_type",
|
||||
"urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
||||
),
|
||||
("client_assertion", federated_token.trim()),
|
||||
("scope", &self.scopes_string()),
|
||||
];
|
||||
|
||||
self.post_token_request(&token_endpoint, ¶ms).await
|
||||
}
|
||||
|
||||
// -- Refresh Token Flow --
|
||||
|
||||
async fn refresh_with_token(&self, refresh_token: &str) -> Result<TokenResponse> {
|
||||
let token_endpoint = self.get_token_endpoint().await?;
|
||||
|
||||
let mut params = vec![
|
||||
("grant_type", "refresh_token"),
|
||||
("client_id", self.config.client_id.as_str()),
|
||||
("refresh_token", refresh_token),
|
||||
];
|
||||
if let Some(ref secret) = self.config.client_secret {
|
||||
params.push(("client_secret", secret.as_str()));
|
||||
}
|
||||
|
||||
self.post_token_request(&token_endpoint, ¶ms).await
|
||||
}
|
||||
|
||||
// -- Shared Helpers --
|
||||
|
||||
async fn post_token_request(
|
||||
&self,
|
||||
endpoint: &str,
|
||||
params: &[(&str, &str)],
|
||||
) -> Result<TokenResponse> {
|
||||
let resp = self
|
||||
.http_client
|
||||
.post(endpoint)
|
||||
.form(params)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Token request to {endpoint} failed: {e}"),
|
||||
})?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
return Err(Error::Runtime {
|
||||
message: format!(
|
||||
"Token request failed with status {}: {}",
|
||||
resp.status(),
|
||||
resp.text().await.unwrap_or_default()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
resp.json().await.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to parse token response: {e}"),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl HeaderProvider for OAuthHeaderProvider {
|
||||
async fn get_headers(&self) -> Result<HashMap<String, String>> {
|
||||
let token = self.get_valid_token().await?;
|
||||
Ok(HashMap::from([(
|
||||
"authorization".to_string(),
|
||||
format!("Bearer {token}"),
|
||||
)]))
|
||||
}
|
||||
}
|
||||
|
||||
// -- Utility functions --
|
||||
|
||||
fn base64_url_encode(input: &[u8]) -> String {
|
||||
use base64::Engine;
|
||||
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(input)
|
||||
}
|
||||
|
||||
/// Extract a query parameter value from a raw HTTP GET request line.
|
||||
fn extract_query_param(request: &str, param: &str) -> Option<String> {
|
||||
let first_line = request.lines().next()?;
|
||||
let path = first_line.split_whitespace().nth(1)?;
|
||||
let query = path.split('?').nth(1)?;
|
||||
for pair in query.split('&') {
|
||||
let mut kv = pair.splitn(2, '=');
|
||||
if let (Some(key), Some(value)) = (kv.next(), kv.next())
|
||||
&& key == param
|
||||
{
|
||||
return Some(urlencoding::decode(value).ok()?.into_owned());
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_extract_query_param() {
|
||||
let request = "GET /callback?code=abc123&state=xyz HTTP/1.1\r\nHost: localhost\r\n";
|
||||
assert_eq!(
|
||||
extract_query_param(request, "code"),
|
||||
Some("abc123".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
extract_query_param(request, "state"),
|
||||
Some("xyz".to_string())
|
||||
);
|
||||
assert_eq!(extract_query_param(request, "missing"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extract_query_param_encoded() {
|
||||
let request = "GET /callback?code=abc%20123&state=x%26y HTTP/1.1\r\n";
|
||||
assert_eq!(
|
||||
extract_query_param(request, "code"),
|
||||
Some("abc 123".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
extract_query_param(request, "state"),
|
||||
Some("x&y".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_token_state_expiry() {
|
||||
let mut state = TokenState::new();
|
||||
assert!(state.is_expired(Duration::from_secs(0)));
|
||||
|
||||
state.access_token = Some("tok".to_string());
|
||||
state.expires_at = Some(Instant::now() + Duration::from_secs(600));
|
||||
assert!(!state.is_expired(Duration::from_secs(300)));
|
||||
assert!(state.is_expired(Duration::from_secs(601)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_base64_url_encode() {
|
||||
let input = b"hello world";
|
||||
let encoded = base64_url_encode(input);
|
||||
assert!(!encoded.contains('+'));
|
||||
assert!(!encoded.contains('/'));
|
||||
assert!(!encoded.contains('='));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scopes_string() {
|
||||
let config = OAuthConfig {
|
||||
issuer_url: "https://login.microsoftonline.com/tenant/v2.0".to_string(),
|
||||
client_id: "app-id".to_string(),
|
||||
client_secret: Some("secret".to_string()),
|
||||
scopes: vec!["scope1".to_string(), "scope2".to_string()],
|
||||
flow: OAuthFlow::ClientCredentials,
|
||||
refresh_buffer_secs: None,
|
||||
};
|
||||
let provider = OAuthHeaderProvider::new(config).unwrap();
|
||||
assert_eq!(provider.scopes_string(), "scope1 scope2");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_token_endpoint_derivation() {
|
||||
let config = OAuthConfig {
|
||||
issuer_url: "https://login.microsoftonline.com/my-tenant/v2.0".to_string(),
|
||||
client_id: "id".to_string(),
|
||||
client_secret: None,
|
||||
scopes: vec!["api://test/.default".to_string()],
|
||||
flow: OAuthFlow::DeviceCode,
|
||||
refresh_buffer_secs: None,
|
||||
};
|
||||
let provider = OAuthHeaderProvider::new(config).unwrap();
|
||||
assert_eq!(
|
||||
provider.get_token_endpoint_from_issuer(),
|
||||
"https://login.microsoftonline.com/my-tenant/oauth2/v2.0/token"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_client_credentials_requires_secret() {
|
||||
let config = OAuthConfig {
|
||||
issuer_url: "https://login.microsoftonline.com/tenant/v2.0".to_string(),
|
||||
client_id: "app-id".to_string(),
|
||||
client_secret: None,
|
||||
scopes: vec!["scope".to_string()],
|
||||
flow: OAuthFlow::ClientCredentials,
|
||||
refresh_buffer_secs: None,
|
||||
};
|
||||
assert!(OAuthHeaderProvider::new(config).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_scopes_rejected() {
|
||||
let config = OAuthConfig {
|
||||
issuer_url: "https://login.microsoftonline.com/tenant/v2.0".to_string(),
|
||||
client_id: "app-id".to_string(),
|
||||
client_secret: None,
|
||||
scopes: vec![],
|
||||
flow: OAuthFlow::DeviceCode,
|
||||
refresh_buffer_secs: None,
|
||||
};
|
||||
assert!(OAuthHeaderProvider::new(config).is_err());
|
||||
}
|
||||
}
|
||||
@@ -518,6 +518,21 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(order_by) = ¶ms.order_by {
|
||||
body["order_by"] = serde_json::Value::Array(
|
||||
order_by
|
||||
.iter()
|
||||
.map(|o| {
|
||||
serde_json::json!({
|
||||
"column_name": o.column_name,
|
||||
"ascending": o.ascending,
|
||||
"nulls_first": o.nulls_first,
|
||||
})
|
||||
})
|
||||
.collect(),
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1652,6 +1667,24 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Ok(merge_insert_response)
|
||||
}
|
||||
|
||||
async fn set_unenforced_primary_key(&self, _columns: &[&str]) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "set_unenforced_primary_key is not supported on LanceDB cloud.".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn set_lsm_write_spec(&self, _spec: crate::table::LsmWriteSpec) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "set_lsm_write_spec is not supported on LanceDB cloud.".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn unset_lsm_write_spec(&self) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "unset_lsm_write_spec is not supported on LanceDB cloud.".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
|
||||
Ok(Box::new(RemoteTags { inner: self }))
|
||||
}
|
||||
@@ -2078,7 +2111,7 @@ mod tests {
|
||||
use crate::{
|
||||
DistanceType, Error, Table,
|
||||
index::{Index, IndexStatistics, IndexType, vector::IvfPqIndexBuilder},
|
||||
query::{ExecutableQuery, QueryBase},
|
||||
query::{ColumnOrdering, ExecutableQuery, QueryBase},
|
||||
remote::ARROW_FILE_CONTENT_TYPE,
|
||||
};
|
||||
|
||||
@@ -2988,6 +3021,18 @@ mod tests {
|
||||
"distance_type": "cosine",
|
||||
"bypass_vector_index": true,
|
||||
"columns": ["a", "b"],
|
||||
"order_by": [
|
||||
{
|
||||
"column_name": "score",
|
||||
"ascending": false,
|
||||
"nulls_first": true,
|
||||
},
|
||||
{
|
||||
"column_name": "id",
|
||||
"ascending": true,
|
||||
"nulls_first": false,
|
||||
}
|
||||
],
|
||||
"nprobes": 12,
|
||||
"minimum_nprobes": 12,
|
||||
"maximum_nprobes": 12,
|
||||
@@ -3019,6 +3064,10 @@ mod tests {
|
||||
.limit(42)
|
||||
.offset(10)
|
||||
.select(Select::columns(&["a", "b"]))
|
||||
.order_by(Some(vec![
|
||||
ColumnOrdering::desc_nulls_first("score".to_string()),
|
||||
ColumnOrdering::asc_nulls_last("id".to_string()),
|
||||
]))
|
||||
.nearest_to(vec![0.1, 0.2, 0.3])
|
||||
.unwrap()
|
||||
.column("my_vector")
|
||||
|
||||
@@ -74,6 +74,7 @@ pub(crate) mod dataset;
|
||||
pub mod delete;
|
||||
pub mod merge;
|
||||
pub mod optimize;
|
||||
mod primary_key;
|
||||
pub mod query;
|
||||
pub mod schema_evolution;
|
||||
pub mod update;
|
||||
@@ -272,6 +273,176 @@ pub trait Tags: Send + Sync {
|
||||
|
||||
pub use self::merge::MergeResult;
|
||||
|
||||
/// Specification selecting Lance's MemWAL LSM-style write path for
|
||||
/// `merge_insert`.
|
||||
///
|
||||
/// Construct via [`LsmWriteSpec::bucket`], [`LsmWriteSpec::identity`], or
|
||||
/// [`LsmWriteSpec::unsharded`], then optionally chain
|
||||
/// [`LsmWriteSpec::with_maintained_indexes`] (indexes the MemWAL keeps up to
|
||||
/// date) and [`LsmWriteSpec::with_writer_config_defaults`] (default
|
||||
/// `ShardWriter` configuration recorded in the MemWAL index).
|
||||
///
|
||||
/// All variants require the table to have an unenforced primary key.
|
||||
///
|
||||
/// Install a spec with [`Table::set_lsm_write_spec`] and remove it with
|
||||
/// [`Table::unset_lsm_write_spec`]. The actual `merge_insert` dispatch
|
||||
/// onto the MemWAL writer is a follow-up.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum LsmWriteSpec {
|
||||
/// Hash-bucket sharding by the unenforced primary key column.
|
||||
///
|
||||
/// `column` must equal the table's currently-set single-column
|
||||
/// unenforced primary key. `num_buckets` must be in `[1, 1024]`.
|
||||
/// Iceberg-compatible Murmur3-x86-32 (seed 0) is used so each row's
|
||||
/// `bucket(column, num_buckets)` value is stable across processes.
|
||||
Bucket {
|
||||
column: String,
|
||||
num_buckets: u32,
|
||||
/// Names of indexes (already created on the table) that the
|
||||
/// MemWAL should maintain in-memory as rows are appended.
|
||||
maintained_indexes: Vec<String>,
|
||||
/// Default `ShardWriter` configuration recorded in the MemWAL index.
|
||||
writer_config_defaults: HashMap<String, String>,
|
||||
},
|
||||
/// Identity sharding — shard by the raw value of `column`.
|
||||
///
|
||||
/// Use this when the data is already partitioned by `column`; each
|
||||
/// distinct value of `column` becomes its own shard.
|
||||
Identity {
|
||||
column: String,
|
||||
/// Names of indexes (already created on the table) that the
|
||||
/// MemWAL should maintain in-memory as rows are appended.
|
||||
maintained_indexes: Vec<String>,
|
||||
/// Default `ShardWriter` configuration recorded in the MemWAL index.
|
||||
writer_config_defaults: HashMap<String, String>,
|
||||
},
|
||||
/// No sharding — every `merge_insert` call writes to a single MemWAL shard.
|
||||
Unsharded {
|
||||
/// Names of indexes (already created on the table) that the
|
||||
/// MemWAL should maintain in-memory as rows are appended.
|
||||
maintained_indexes: Vec<String>,
|
||||
/// Default `ShardWriter` configuration recorded in the MemWAL index.
|
||||
writer_config_defaults: HashMap<String, String>,
|
||||
},
|
||||
}
|
||||
|
||||
impl LsmWriteSpec {
|
||||
/// Construct a hash-bucket sharding spec with no maintained indexes.
|
||||
pub fn bucket(column: impl Into<String>, num_buckets: u32) -> Self {
|
||||
Self::Bucket {
|
||||
column: column.into(),
|
||||
num_buckets,
|
||||
maintained_indexes: Vec::new(),
|
||||
writer_config_defaults: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct an identity-sharding spec (shard by the raw value of
|
||||
/// `column`) with no maintained indexes.
|
||||
pub fn identity(column: impl Into<String>) -> Self {
|
||||
Self::Identity {
|
||||
column: column.into(),
|
||||
maintained_indexes: Vec::new(),
|
||||
writer_config_defaults: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct an unsharded spec with no maintained indexes.
|
||||
pub fn unsharded() -> Self {
|
||||
Self::Unsharded {
|
||||
maintained_indexes: Vec::new(),
|
||||
writer_config_defaults: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace the list of indexes the MemWAL should keep up to date as
|
||||
/// rows are appended. Each name must reference an index that already
|
||||
/// exists on the table at the time `set_lsm_write_spec` is called.
|
||||
pub fn with_maintained_indexes<I, S>(mut self, indexes: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = S>,
|
||||
S: Into<String>,
|
||||
{
|
||||
let v: Vec<String> = indexes.into_iter().map(Into::into).collect();
|
||||
match &mut self {
|
||||
Self::Bucket {
|
||||
maintained_indexes, ..
|
||||
}
|
||||
| Self::Identity {
|
||||
maintained_indexes, ..
|
||||
}
|
||||
| Self::Unsharded {
|
||||
maintained_indexes, ..
|
||||
} => *maintained_indexes = v,
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Replace the default `ShardWriter` configuration recorded in the MemWAL
|
||||
/// index, so every writer starts from the same defaults. Keys are
|
||||
/// `ShardWriter` config field names (`Duration` knobs use a `_ms` suffix);
|
||||
/// values are their string encodings.
|
||||
pub fn with_writer_config_defaults<I, K, V>(mut self, defaults: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = (K, V)>,
|
||||
K: Into<String>,
|
||||
V: Into<String>,
|
||||
{
|
||||
let m: HashMap<String, String> = defaults
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k.into(), v.into()))
|
||||
.collect();
|
||||
match &mut self {
|
||||
Self::Bucket {
|
||||
writer_config_defaults,
|
||||
..
|
||||
}
|
||||
| Self::Identity {
|
||||
writer_config_defaults,
|
||||
..
|
||||
}
|
||||
| Self::Unsharded {
|
||||
writer_config_defaults,
|
||||
..
|
||||
} => *writer_config_defaults = m,
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Borrow the list of index names this spec asks MemWAL to maintain.
|
||||
pub fn maintained_indexes(&self) -> &[String] {
|
||||
match self {
|
||||
Self::Bucket {
|
||||
maintained_indexes, ..
|
||||
}
|
||||
| Self::Identity {
|
||||
maintained_indexes, ..
|
||||
}
|
||||
| Self::Unsharded {
|
||||
maintained_indexes, ..
|
||||
} => maintained_indexes,
|
||||
}
|
||||
}
|
||||
|
||||
/// Borrow the default `ShardWriter` configuration recorded by this spec.
|
||||
pub fn writer_config_defaults(&self) -> &HashMap<String, String> {
|
||||
match self {
|
||||
Self::Bucket {
|
||||
writer_config_defaults,
|
||||
..
|
||||
}
|
||||
| Self::Identity {
|
||||
writer_config_defaults,
|
||||
..
|
||||
}
|
||||
| Self::Unsharded {
|
||||
writer_config_defaults,
|
||||
..
|
||||
} => writer_config_defaults,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait for anything "table-like". This is used for both native tables (which target
|
||||
/// Lance datasets) and remote tables (which target LanceDB cloud)
|
||||
///
|
||||
@@ -345,6 +516,43 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
params: MergeInsertBuilder,
|
||||
new_data: Box<dyn RecordBatchReader + Send>,
|
||||
) -> Result<MergeResult>;
|
||||
/// Set the unenforced primary key for the table to a single column.
|
||||
///
|
||||
/// "Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
/// column is recorded in the schema as the primary key for use by
|
||||
/// features such as `merge_insert`. Only single-column primary keys are
|
||||
/// supported, and the key cannot be changed once set.
|
||||
///
|
||||
/// The default implementation returns `NotSupported`; table types
|
||||
/// backed by a Lance dataset override it.
|
||||
async fn set_unenforced_primary_key(&self, _columns: &[&str]) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "set_unenforced_primary_key is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
/// Install an [`LsmWriteSpec`] on this table.
|
||||
///
|
||||
/// The spec selects Lance's MemWAL LSM-style write path for future
|
||||
/// `merge_insert` calls.
|
||||
///
|
||||
/// The default implementation returns `NotSupported`. Implementations
|
||||
/// that support the MemWAL LSM write path must override this.
|
||||
async fn set_lsm_write_spec(&self, _spec: LsmWriteSpec) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "set_lsm_write_spec is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
/// Remove the [`LsmWriteSpec`] from this table.
|
||||
///
|
||||
/// This is a no-op if no spec is currently set.
|
||||
///
|
||||
/// The default implementation returns `NotSupported`. Implementations
|
||||
/// that support the MemWAL LSM write path must override this.
|
||||
async fn unset_lsm_write_spec(&self) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "unset_lsm_write_spec is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
/// Gets the table tag manager.
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
|
||||
/// Optimize the dataset.
|
||||
@@ -1063,6 +1271,68 @@ impl Table {
|
||||
self.inner.drop_columns(columns).await
|
||||
}
|
||||
|
||||
/// Set the unenforced primary key for this table to a single column.
|
||||
///
|
||||
/// "Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
/// column is recorded in the schema as the primary key so that features
|
||||
/// such as `merge_insert` can use it.
|
||||
///
|
||||
/// Only single-column primary keys are supported, and the key cannot be
|
||||
/// changed once set — calling this on a table that already has an
|
||||
/// unenforced primary key fails. `columns` is an iterable for binding
|
||||
/// ergonomics but must yield exactly one column:
|
||||
///
|
||||
/// - `table.set_unenforced_primary_key(["id"])`
|
||||
pub async fn set_unenforced_primary_key<I, S>(&self, columns: I) -> Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = S>,
|
||||
S: Into<String>,
|
||||
{
|
||||
let owned: Vec<String> = columns.into_iter().map(Into::into).collect();
|
||||
let borrowed: Vec<&str> = owned.iter().map(String::as_str).collect();
|
||||
self.inner.set_unenforced_primary_key(&borrowed).await
|
||||
}
|
||||
|
||||
/// Install an [`LsmWriteSpec`] on this table, selecting Lance's MemWAL
|
||||
/// LSM-style write path for future `merge_insert` calls.
|
||||
///
|
||||
/// [`LsmWriteSpec`] chooses one of three sharding strategies:
|
||||
///
|
||||
/// - [`LsmWriteSpec::bucket`] — hash-bucket writes by the single-column
|
||||
/// unenforced primary key.
|
||||
/// - [`LsmWriteSpec::identity`] — shard by the raw value of a scalar column.
|
||||
/// - [`LsmWriteSpec::unsharded`] — route every write to a single shard.
|
||||
///
|
||||
/// All variants require the table to have an unenforced primary key
|
||||
/// ([`Table::set_unenforced_primary_key`]); bucket sharding additionally
|
||||
/// requires it to be the single column being bucketed.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # use lancedb::table::{LsmWriteSpec, Table};
|
||||
/// # async fn example(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// table.set_unenforced_primary_key(["id"]).await?;
|
||||
/// table
|
||||
/// .set_lsm_write_spec(
|
||||
/// LsmWriteSpec::bucket("id", 16).with_maintained_indexes(["id_idx"]),
|
||||
/// )
|
||||
/// .await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> Result<()> {
|
||||
self.inner.set_lsm_write_spec(spec).await
|
||||
}
|
||||
|
||||
/// Remove the [`LsmWriteSpec`] from this table, reverting to the standard
|
||||
/// `merge_insert` write path.
|
||||
///
|
||||
/// Errors if no spec is currently set.
|
||||
pub async fn unset_lsm_write_spec(&self) -> Result<()> {
|
||||
self.inner.unset_lsm_write_spec().await
|
||||
}
|
||||
|
||||
/// Retrieve the version of the table
|
||||
///
|
||||
/// LanceDb supports versioning. Every operation that modifies the table increases
|
||||
@@ -2469,6 +2739,18 @@ impl BaseTable for NativeTable {
|
||||
merge::execute_merge_insert(self, params, new_data).await
|
||||
}
|
||||
|
||||
async fn set_unenforced_primary_key(&self, columns: &[&str]) -> Result<()> {
|
||||
primary_key::set_unenforced_primary_key(self, columns).await
|
||||
}
|
||||
|
||||
async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> Result<()> {
|
||||
merge::lsm::set_lsm_write_spec(self, spec).await
|
||||
}
|
||||
|
||||
async fn unset_lsm_write_spec(&self) -> Result<()> {
|
||||
merge::lsm::unset_lsm_write_spec(self).await
|
||||
}
|
||||
|
||||
/// Delete rows from the table
|
||||
async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
|
||||
// Delegate to the submodule implementation
|
||||
@@ -2770,6 +3052,7 @@ mod tests {
|
||||
use futures::TryStreamExt;
|
||||
use lance::Dataset;
|
||||
use lance::io::{ObjectStoreParams, WrappingObjectStore};
|
||||
use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
|
||||
use tempfile::tempdir;
|
||||
|
||||
use super::*;
|
||||
@@ -3864,6 +4147,395 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_unenforced_primary_key() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
Field::new("name", DataType::Utf8, true),
|
||||
Field::new("score", DataType::Float64, true),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
|
||||
Arc::new(StringArray::from(vec!["a", "b", "c"])),
|
||||
Arc::new(arrow_array::Float64Array::from(vec![1.0, 2.0, 3.0])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
// Reject empty input.
|
||||
let err = table
|
||||
.set_unenforced_primary_key(Vec::<&str>::new())
|
||||
.await
|
||||
.expect_err("empty input should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
|
||||
// Reject compound (multi-column) input.
|
||||
let err = table
|
||||
.set_unenforced_primary_key(["id", "name"])
|
||||
.await
|
||||
.expect_err("compound primary key should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
|
||||
// Reject unknown column.
|
||||
let err = table
|
||||
.set_unenforced_primary_key(["nonexistent"])
|
||||
.await
|
||||
.expect_err("nonexistent column should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
|
||||
// Reject unsupported dtype (Float64).
|
||||
let err = table
|
||||
.set_unenforced_primary_key(["score"])
|
||||
.await
|
||||
.expect_err("Float64 should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
|
||||
// None of the rejected calls set a primary key.
|
||||
let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
|
||||
assert!(lance_schema.unenforced_primary_key().is_empty());
|
||||
|
||||
// Happy path: set the primary key to "id".
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
|
||||
let pk = lance_schema.unenforced_primary_key();
|
||||
assert_eq!(pk.len(), 1);
|
||||
assert_eq!(pk[0].name, "id");
|
||||
// Position metadata is 1-indexed.
|
||||
assert_eq!(
|
||||
pk[0].metadata.get(LANCE_UNENFORCED_PRIMARY_KEY_POSITION),
|
||||
Some(&"1".to_string())
|
||||
);
|
||||
|
||||
// The primary key is immutable: re-setting it is rejected, whether to
|
||||
// the same column or a different one.
|
||||
let err = table
|
||||
.set_unenforced_primary_key(["id"])
|
||||
.await
|
||||
.expect_err("re-setting the same primary key should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
let err = table
|
||||
.set_unenforced_primary_key(["name"])
|
||||
.await
|
||||
.expect_err("changing the primary key should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
|
||||
// The primary key is unchanged after the rejected calls.
|
||||
let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
|
||||
let pk = lance_schema.unenforced_primary_key();
|
||||
assert_eq!(pk.len(), 1);
|
||||
assert_eq!(pk[0].name, "id");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_unenforced_primary_key_concurrent() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3]))],
|
||||
)
|
||||
.unwrap();
|
||||
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
|
||||
|
||||
// A long read-consistency interval keeps each handle pinned to the
|
||||
// version it opened, so the second handle commits against a stale
|
||||
// base — the same situation as two processes racing.
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(3600))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
let table_a = conn.open_table("t").execute().await.unwrap();
|
||||
let table_b = conn.open_table("t").execute().await.unwrap();
|
||||
|
||||
// Handle A sets the primary key first.
|
||||
table_a.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
|
||||
// Handle B committed against a stale base that had no primary key, so
|
||||
// its own up-front check did not see A's key. The commit itself must
|
||||
// still fail rather than silently overriding A's primary key. (The
|
||||
// cross-process race on a *different* column is caught by the Lance
|
||||
// commit layer.)
|
||||
let err = table_b
|
||||
.set_unenforced_primary_key(["id"])
|
||||
.await
|
||||
.expect_err("concurrent primary key commit on a stale base should fail");
|
||||
assert!(
|
||||
!matches!(err, Error::InvalidInput { .. }),
|
||||
"expected a commit-time conflict, not an up-front input error: {:?}",
|
||||
err
|
||||
);
|
||||
|
||||
// The committed primary key is exactly what A set — no corruption.
|
||||
let fresh = conn.open_table("t").execute().await.unwrap();
|
||||
let lance_schema = fresh.as_native().unwrap().manifest().await.unwrap().schema;
|
||||
let pk = lance_schema.unenforced_primary_key();
|
||||
assert_eq!(pk.len(), 1);
|
||||
assert_eq!(pk[0].name, "id");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_lsm_write_spec() {
|
||||
use arrow_array::StringArray;
|
||||
use lance::dataset::mem_wal::DatasetMemWalExt;
|
||||
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
Field::new("name", DataType::Utf8, true),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
|
||||
Arc::new(StringArray::from(vec!["a", "b", "c"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
// Reject when no PK is set.
|
||||
let err = table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
|
||||
.await
|
||||
.expect_err("should reject without PK");
|
||||
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
|
||||
|
||||
// Set PK, then a mismatched column on the spec must be rejected.
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
let err = table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("name", 4))
|
||||
.await
|
||||
.expect_err("should reject column != PK");
|
||||
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
|
||||
|
||||
// Reject num_buckets out of range.
|
||||
for bad in [0u32, 1025] {
|
||||
let err = table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", bad))
|
||||
.await
|
||||
.expect_err("should reject");
|
||||
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
|
||||
}
|
||||
|
||||
// Happy path: install spec; verify MemWAL details record it.
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let native_tbl = table.as_native().unwrap();
|
||||
let dataset = native_tbl.dataset.get().await.unwrap();
|
||||
let details = dataset
|
||||
.mem_wal_index_details()
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("MemWAL index should be initialized");
|
||||
assert_eq!(details.num_shards, 4);
|
||||
assert_eq!(details.sharding_specs.len(), 1);
|
||||
let installed = &details.sharding_specs[0];
|
||||
assert_eq!(installed.fields.len(), 1);
|
||||
let f = &installed.fields[0];
|
||||
assert_eq!(f.transform.as_deref(), Some("bucket"));
|
||||
assert_eq!(
|
||||
f.parameters.get("num_buckets").map(String::as_str),
|
||||
Some("4")
|
||||
);
|
||||
// Bucket parameters must hold only `num_buckets`.
|
||||
assert_eq!(f.parameters.len(), 1);
|
||||
|
||||
// Mutation rejected.
|
||||
let err = table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 8))
|
||||
.await
|
||||
.expect_err("mutation should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_lsm_write_spec_unsharded() {
|
||||
use lance::dataset::mem_wal::DatasetMemWalExt;
|
||||
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(arrow_array::Int64Array::from(vec![1]))],
|
||||
)
|
||||
.unwrap();
|
||||
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
// Lance's MemWAL still requires *some* unenforced primary key on
|
||||
// the dataset; Unsharded just skips the per-row hashing step.
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::unsharded())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
|
||||
let details = dataset
|
||||
.mem_wal_index_details()
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("MemWAL index should be initialized");
|
||||
assert_eq!(details.num_shards, 1);
|
||||
assert_eq!(details.sharding_specs.len(), 1);
|
||||
let f = &details.sharding_specs[0].fields[0];
|
||||
assert_eq!(f.transform.as_deref(), Some("unsharded"));
|
||||
assert!(f.source_ids.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_lsm_write_spec_identity() {
|
||||
use lance::dataset::mem_wal::DatasetMemWalExt;
|
||||
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
Field::new("region", DataType::Utf8, true),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
|
||||
Arc::new(StringArray::from(vec!["a", "b", "c"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(
|
||||
LsmWriteSpec::identity("region")
|
||||
.with_writer_config_defaults([("durable_write", "false")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
|
||||
let details = dataset
|
||||
.mem_wal_index_details()
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("MemWAL index should be initialized");
|
||||
// Identity sharding records an open-ended shard count.
|
||||
assert_eq!(details.num_shards, 0);
|
||||
assert_eq!(details.sharding_specs.len(), 1);
|
||||
let f = &details.sharding_specs[0].fields[0];
|
||||
assert_eq!(f.transform.as_deref(), Some("identity"));
|
||||
// Writer config defaults round-trip into the MemWAL index.
|
||||
assert_eq!(
|
||||
details
|
||||
.writer_config_defaults
|
||||
.get("durable_write")
|
||||
.map(String::as_str),
|
||||
Some("false")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_unset_lsm_write_spec() {
|
||||
use lance::dataset::mem_wal::DatasetMemWalExt;
|
||||
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(arrow_array::Int64Array::from(vec![1]))],
|
||||
)
|
||||
.unwrap();
|
||||
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
// unset errors when no spec is set.
|
||||
table.unset_lsm_write_spec().await.unwrap_err();
|
||||
|
||||
// Install a spec, then unset it.
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
|
||||
.await
|
||||
.unwrap();
|
||||
{
|
||||
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
|
||||
assert!(dataset.mem_wal_index_details().await.unwrap().is_some());
|
||||
}
|
||||
|
||||
table.unset_lsm_write_spec().await.unwrap();
|
||||
{
|
||||
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
|
||||
assert!(dataset.mem_wal_index_details().await.unwrap().is_none());
|
||||
}
|
||||
|
||||
// A second unset errors; a fresh spec can still be installed afterwards.
|
||||
table.unset_lsm_write_spec().await.unwrap_err();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 8))
|
||||
.await
|
||||
.unwrap();
|
||||
{
|
||||
let dataset = table.as_native().unwrap().dataset.get().await.unwrap();
|
||||
assert!(dataset.mem_wal_index_details().await.unwrap().is_some());
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_stats() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
|
||||
@@ -16,6 +16,8 @@ use crate::error::{Error, Result};
|
||||
|
||||
use super::{BaseTable, NativeTable};
|
||||
|
||||
pub(crate) mod lsm;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct MergeResult {
|
||||
// The commit version associated with the operation.
|
||||
|
||||
101
rust/lancedb/src/table/merge/lsm.rs
Normal file
101
rust/lancedb/src/table/merge/lsm.rs
Normal file
@@ -0,0 +1,101 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! MemWAL LSM write-path spec management.
|
||||
//!
|
||||
//! [`set_lsm_write_spec`] installs a [`super::super::LsmWriteSpec`] on a
|
||||
//! table, which selects Lance's MemWAL LSM-style write path for future
|
||||
//! `merge_insert` calls. [`unset_lsm_write_spec`] removes it. The actual
|
||||
//! `merge_insert` dispatch and writer are a follow-up.
|
||||
|
||||
use lance::dataset::mem_wal::DatasetMemWalExt;
|
||||
use lance::index::DatasetIndexExt;
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::table::{LsmWriteSpec, NativeTable};
|
||||
|
||||
// =============================================================================
|
||||
// set_lsm_write_spec
|
||||
// =============================================================================
|
||||
|
||||
/// Install an [`LsmWriteSpec`] on the table.
|
||||
///
|
||||
/// The bucket / unsharded sharding spec is constructed and validated by Lance's
|
||||
/// [`InitializeMemWalBuilder`](lance::dataset::mem_wal::InitializeMemWalBuilder).
|
||||
#[allow(clippy::redundant_pub_crate)]
|
||||
pub(crate) async fn set_lsm_write_spec(table: &NativeTable, spec: LsmWriteSpec) -> Result<()> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
|
||||
{
|
||||
let dataset = table.dataset.get().await?;
|
||||
if dataset.mem_wal_index_details().await?.is_some() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "set_lsm_write_spec: an LSM write spec is already set on this table; mutation is not supported".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
let mut builder = dataset.initialize_mem_wal();
|
||||
let (maintained_indexes, writer_config_defaults) = match spec {
|
||||
LsmWriteSpec::Bucket {
|
||||
column,
|
||||
num_buckets,
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => {
|
||||
builder = builder.bucket_sharding(column, num_buckets);
|
||||
(maintained_indexes, writer_config_defaults)
|
||||
}
|
||||
LsmWriteSpec::Identity {
|
||||
column,
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => {
|
||||
builder = builder.identity_sharding(column);
|
||||
(maintained_indexes, writer_config_defaults)
|
||||
}
|
||||
LsmWriteSpec::Unsharded {
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => {
|
||||
builder = builder.unsharded();
|
||||
(maintained_indexes, writer_config_defaults)
|
||||
}
|
||||
};
|
||||
builder = builder.maintained_indexes(maintained_indexes);
|
||||
for (key, value) in writer_config_defaults {
|
||||
builder = builder.add_writer_config_default(key, value);
|
||||
}
|
||||
builder.execute().await?;
|
||||
table.dataset.update(dataset);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// unset_lsm_write_spec
|
||||
// =============================================================================
|
||||
|
||||
/// Remove the [`LsmWriteSpec`] from the table by dropping the MemWAL index.
|
||||
///
|
||||
/// Errors if no spec is currently set.
|
||||
#[allow(clippy::redundant_pub_crate)]
|
||||
pub(crate) async fn unset_lsm_write_spec(table: &NativeTable) -> Result<()> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
|
||||
{
|
||||
let dataset = table.dataset.get().await?;
|
||||
if dataset.mem_wal_index_details().await?.is_none() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "unset_lsm_write_spec: no LSM write spec is set on this table".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
dataset
|
||||
.drop_index(lance_index::mem_wal::MEM_WAL_INDEX_NAME)
|
||||
.await?;
|
||||
table.dataset.update(dataset);
|
||||
Ok(())
|
||||
}
|
||||
113
rust/lancedb/src/table/primary_key.rs
Normal file
113
rust/lancedb/src/table/primary_key.rs
Normal file
@@ -0,0 +1,113 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! Table-level unenforced primary key support.
|
||||
//!
|
||||
//! [`set_unenforced_primary_key`] records a column as the table's primary key
|
||||
//! by writing Lance schema field metadata. "Unenforced" means LanceDB does not
|
||||
//! check uniqueness on write; the key is metadata for features such as
|
||||
//! `merge_insert` to consume.
|
||||
//!
|
||||
//! Only a single-column primary key is supported, and the key cannot be
|
||||
//! changed once set.
|
||||
|
||||
use arrow_schema::DataType;
|
||||
use lance_core::datatypes::{LANCE_UNENFORCED_PRIMARY_KEY, LANCE_UNENFORCED_PRIMARY_KEY_POSITION};
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::table::NativeTable;
|
||||
|
||||
/// Set the unenforced primary key on `table` to the single column in `columns`.
|
||||
///
|
||||
/// Fails if `columns` is not exactly one column (compound primary keys are not
|
||||
/// supported), if the column does not exist or has an unsupported dtype, or if
|
||||
/// the table already has an unenforced primary key (changing the primary key
|
||||
/// is not supported).
|
||||
pub(super) async fn set_unenforced_primary_key(
|
||||
table: &NativeTable,
|
||||
columns: &[&str],
|
||||
) -> Result<()> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
|
||||
if columns.is_empty() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "set_unenforced_primary_key: a column is required".into(),
|
||||
});
|
||||
}
|
||||
if columns.len() > 1 {
|
||||
return Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
"set_unenforced_primary_key: compound primary keys are not supported, got {} columns",
|
||||
columns.len()
|
||||
),
|
||||
});
|
||||
}
|
||||
let column = columns[0];
|
||||
|
||||
let updates = {
|
||||
let dataset = table.dataset.get().await?;
|
||||
let schema = dataset.schema();
|
||||
|
||||
// The primary key is immutable once set. The Lance commit layer is the
|
||||
// source of truth for this (it also covers the concurrent-writer race);
|
||||
// this check just fails fast with a clear message.
|
||||
if !schema.unenforced_primary_key().is_empty() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "set_unenforced_primary_key: an unenforced primary key is already set on this table; changing it is not supported".into(),
|
||||
});
|
||||
}
|
||||
|
||||
let field = schema.field(column).ok_or_else(|| Error::InvalidInput {
|
||||
message: format!(
|
||||
"set_unenforced_primary_key: column '{}' not found on table",
|
||||
column
|
||||
),
|
||||
})?;
|
||||
if !is_supported_pk_dtype(&field.data_type()) {
|
||||
return Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
"set_unenforced_primary_key: column '{}' has dtype {:?} which is not supported as a primary key. Supported: Int32, Int64, Utf8, LargeUtf8, Binary, LargeBinary, FixedSizeBinary",
|
||||
column,
|
||||
field.data_type()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
// Position metadata is 1-indexed; `Schema::unenforced_primary_key`
|
||||
// treats position 0 as a legacy "no specific position" fallback.
|
||||
let mut metadata = field.metadata.clone();
|
||||
metadata.remove(LANCE_UNENFORCED_PRIMARY_KEY);
|
||||
metadata.insert(
|
||||
LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
|
||||
"1".to_string(),
|
||||
);
|
||||
vec![(field_id_to_u32(field.id, &field.name)?, metadata)]
|
||||
};
|
||||
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
dataset.replace_field_metadata(updates).await?;
|
||||
table.dataset.update(dataset);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn field_id_to_u32(id: i32, name: &str) -> Result<u32> {
|
||||
u32::try_from(id).map_err(|_| Error::Runtime {
|
||||
message: format!(
|
||||
"internal: field '{}' has unexpected negative field id {}",
|
||||
name, id
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
fn is_supported_pk_dtype(dtype: &DataType) -> bool {
|
||||
matches!(
|
||||
dtype,
|
||||
DataType::Int32
|
||||
| DataType::Int64
|
||||
| DataType::Utf8
|
||||
| DataType::LargeUtf8
|
||||
| DataType::Binary
|
||||
| DataType::LargeBinary
|
||||
| DataType::FixedSizeBinary(_)
|
||||
)
|
||||
}
|
||||
@@ -242,6 +242,10 @@ pub async fn create_plan(
|
||||
scanner.disable_scoring_autoprojection();
|
||||
}
|
||||
|
||||
if let Some(order_by) = &query.base.order_by {
|
||||
scanner.order_by(Some(order_by.clone()))?;
|
||||
}
|
||||
|
||||
Ok(scanner.create_plan().await?)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user