mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-23 06:50:40 +00:00
Compare commits
1 Commits
dantasse/u
...
codex/upda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dd8defa664 |
5
.github/workflows/java-publish.yml
vendored
5
.github/workflows/java-publish.yml
vendored
@@ -43,7 +43,7 @@ jobs:
|
||||
server-username: SONATYPE_USER
|
||||
server-password: SONATYPE_TOKEN
|
||||
gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }}
|
||||
gpg-passphrase: MAVEN_GPG_PASSPHRASE
|
||||
gpg-passphrase: ${{ secrets.GPG_PASSPHRASE }}
|
||||
- name: Set git config
|
||||
run: |
|
||||
git config --global user.email "dev+gha@lancedb.com"
|
||||
@@ -58,11 +58,10 @@ jobs:
|
||||
echo "use-agent" >> ~/.gnupg/gpg.conf
|
||||
echo "pinentry-mode loopback" >> ~/.gnupg/gpg.conf
|
||||
export GPG_TTY=$(tty)
|
||||
./mvnw --batch-mode -DskipTests -DpushChanges=false deploy -pl lancedb-core -am -P deploy-to-ossrh
|
||||
./mvnw --batch-mode -DskipTests -DpushChanges=false -Dgpg.passphrase=${{ secrets.GPG_PASSPHRASE }} deploy -pl lancedb-core -am -P deploy-to-ossrh
|
||||
env:
|
||||
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
|
||||
SONATYPE_TOKEN: ${{ secrets.SONATYPE_TOKEN }}
|
||||
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
|
||||
|
||||
report-failure:
|
||||
name: Report Workflow Failure
|
||||
|
||||
1883
Cargo.lock
generated
1883
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
30
Cargo.toml
30
Cargo.toml
@@ -13,20 +13,20 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=7.0.0-beta.1", default-features = false, "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.0.0-beta.1", "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.0.0-beta.1", "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.0.0-beta.1", "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.0.0-beta.1", default-features = false, "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.0.0-beta.1", "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.0.0-beta.1", "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.0.0-beta.1", "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.0.0-beta.1", default-features = false, "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.0.0-beta.1", "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.0.0-beta.1", "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.0.0-beta.1", "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.0.0-beta.1", "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.0.0-beta.1", "tag" = "v7.0.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "58.0.0", optional = false }
|
||||
@@ -54,7 +54,7 @@ half = { "version" = "2.7.1", default-features = false, features = [
|
||||
futures = "0"
|
||||
log = "0.4"
|
||||
moka = { version = "0.12", features = ["future"] }
|
||||
object_store = "0.13.2"
|
||||
object_store = "0.12.0"
|
||||
pin-project = "1.0.7"
|
||||
rand = "0.9"
|
||||
snafu = "0.8"
|
||||
|
||||
34
deny.toml
34
deny.toml
@@ -51,18 +51,6 @@ ignore = [
|
||||
# https://rustsec.org/advisories/RUSTSEC-2024-0436
|
||||
{ id = "RUSTSEC-2024-0436", reason = "transitive via datafusion; awaiting ecosystem migration" },
|
||||
|
||||
# encoding: unmaintained. Reached through lindera-dictionary, which is
|
||||
# required by the native Lindera tokenizer path. Lindera has not migrated
|
||||
# off this crate yet.
|
||||
# https://rustsec.org/advisories/RUSTSEC-2021-0153
|
||||
{ id = "RUSTSEC-2021-0153", reason = "transitive via lindera-dictionary for native Lindera tokenizer" },
|
||||
|
||||
# fast-float: unsound and unmaintained. Reached only through polars-arrow
|
||||
# from the optional Polars integration; replacement requires a Polars
|
||||
# dependency upgrade.
|
||||
# https://rustsec.org/advisories/RUSTSEC-2024-0379
|
||||
{ id = "RUSTSEC-2024-0379", reason = "transitive via polars-arrow; waiting on Polars migration" },
|
||||
|
||||
# tantivy: segfault on malformed input due to missing bounds check.
|
||||
# Pulled in via lance for full-text search. We only feed tantivy
|
||||
# documents we construct ourselves, not attacker-controlled bytes.
|
||||
@@ -80,17 +68,11 @@ ignore = [
|
||||
# https://rustsec.org/advisories/RUSTSEC-2025-0119
|
||||
{ id = "RUSTSEC-2025-0119", reason = "transitive via hf-hub/indicatif; cosmetic formatting crate" },
|
||||
|
||||
# bincode: unmaintained. Reached through lindera and lindera-dictionary,
|
||||
# which are required by the native Lindera tokenizer path. Lindera has not
|
||||
# migrated to another serialization format yet.
|
||||
# https://rustsec.org/advisories/RUSTSEC-2025-0141
|
||||
{ id = "RUSTSEC-2025-0141", reason = "transitive via lindera/lindera-dictionary for native Lindera tokenizer" },
|
||||
|
||||
# lru: soundness issue in IterMut. Reached only through aws-sdk-s3 in
|
||||
# LanceDB's dev-dependency graph; LanceDB does not use that iterator
|
||||
# directly. Clearing this requires the AWS SDK chain to update lru.
|
||||
# https://rustsec.org/advisories/RUSTSEC-2026-0002
|
||||
{ id = "RUSTSEC-2026-0002", reason = "transitive via aws-sdk-s3 dev-dependency; waiting on AWS SDK lru upgrade" },
|
||||
# rustls-pemfile: unmaintained. Reached from two separate chains:
|
||||
# rustls-native-certs 0.6 (via hyper-rustls 0.24) and object_store 0.12.
|
||||
# Both upstream dependencies need to move before we can drop it.
|
||||
# https://rustsec.org/advisories/RUSTSEC-2025-0134
|
||||
{ id = "RUSTSEC-2025-0134", reason = "transitive via rustls-native-certs/object_store; waiting on upstream migration" },
|
||||
|
||||
# rustls-webpki 0.101.7 (old major line): name-constraint checks for
|
||||
# URI / wildcard names. Pulled in only via the legacy rustls 0.21 chain
|
||||
@@ -107,12 +89,6 @@ ignore = [
|
||||
# we actively use is upgraded to 0.103.13 which contains the fix.
|
||||
# https://rustsec.org/advisories/RUSTSEC-2026-0104
|
||||
{ id = "RUSTSEC-2026-0104", reason = "only affects rustls-webpki 0.101 from legacy aws-smithy/rustls 0.21 chain" },
|
||||
|
||||
# rand 0.8.5: soundness issue only when ThreadRng reseeds inside a custom
|
||||
# logger. Reached through several transitive chains. LanceDB does not use
|
||||
# rand from a custom logger; upgrade once all pinned chains accept 0.8.6+.
|
||||
# https://rustsec.org/advisories/RUSTSEC-2026-0097
|
||||
{ id = "RUSTSEC-2026-0097", reason = "transitive rand 0.8.5; LanceDB does not call ThreadRng from custom logging" },
|
||||
]
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -501,34 +501,6 @@ Modeled after ``VACUUM`` in PostgreSQL.
|
||||
|
||||
***
|
||||
|
||||
### prewarmData()
|
||||
|
||||
```ts
|
||||
abstract prewarmData(columns?): Promise<void>
|
||||
```
|
||||
|
||||
Prewarm one or more columns of data in the table.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **columns?**: `string`[]
|
||||
The columns to prewarm. If undefined, all columns are prewarmed.
|
||||
This will load the column data into the page cache so that future queries that
|
||||
read those columns avoid the initial cold-start latency. This call initiates
|
||||
prewarming and returns once the request is accepted; the warming itself may
|
||||
continue in the background. Calling it on already-prewarmed columns is a
|
||||
no-op on the server.
|
||||
Prewarming is generally useful for columns used in filters or projections.
|
||||
Large columns (e.g. high-dimensional vectors or binary data) may not be
|
||||
practical to prewarm.
|
||||
This feature is currently only supported on remote tables.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
***
|
||||
|
||||
### prewarmIndex()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -94,11 +94,11 @@ of raw SQL strings with [where][lancedb.query.LanceQueryBuilder.where] and
|
||||
|
||||
## Full text search
|
||||
|
||||
Use [lancedb.table.Table.create_fts_index][] for the synchronous API or
|
||||
[lancedb.table.AsyncTable.create_index][] with [lancedb.index.FTS][] for the
|
||||
asynchronous API.
|
||||
::: lancedb.fts.create_index
|
||||
|
||||
::: lancedb.index.FTS
|
||||
::: lancedb.fts.populate_index
|
||||
|
||||
::: lancedb.fts.search_index
|
||||
|
||||
## Utilities
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<arrow.version>15.0.0</arrow.version>
|
||||
<lance-core.version>7.0.0-beta.7</lance-core.version>
|
||||
<lance-core.version>7.0.0-beta.1</lance-core.version>
|
||||
<spotless.skip>false</spotless.skip>
|
||||
<spotless.version>2.30.0</spotless.version>
|
||||
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
|
||||
|
||||
@@ -1870,25 +1870,6 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
|
||||
expect(results.length).toBe(3);
|
||||
});
|
||||
|
||||
test("prewarmData errors on local tables", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
const data = [
|
||||
{ text: "alpha", vector: [0.1, 0.2, 0.3] },
|
||||
{ text: "beta", vector: [0.4, 0.5, 0.6] },
|
||||
];
|
||||
const table = await db.createTable("prewarm_data_test", data);
|
||||
|
||||
// prewarmData is only supported on remote tables. We verify the call
|
||||
// is wired through napi and surfaces the expected error for both
|
||||
// arg shapes (undefined and string[]).
|
||||
await expect(table.prewarmData()).rejects.toThrow(
|
||||
"prewarm_data is currently only supported on remote tables",
|
||||
);
|
||||
await expect(table.prewarmData(["text"])).rejects.toThrow(
|
||||
"prewarm_data is currently only supported on remote tables",
|
||||
);
|
||||
});
|
||||
|
||||
test("full text index on list", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
const data = [
|
||||
|
||||
@@ -285,25 +285,6 @@ export abstract class Table {
|
||||
*/
|
||||
abstract prewarmIndex(name: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* Prewarm one or more columns of data in the table.
|
||||
*
|
||||
* @param columns The columns to prewarm. If undefined, all columns are prewarmed.
|
||||
*
|
||||
* This will load the column data into the page cache so that future queries that
|
||||
* read those columns avoid the initial cold-start latency. This call initiates
|
||||
* prewarming and returns once the request is accepted; the warming itself may
|
||||
* continue in the background. Calling it on already-prewarmed columns is a
|
||||
* no-op on the server.
|
||||
*
|
||||
* Prewarming is generally useful for columns used in filters or projections.
|
||||
* Large columns (e.g. high-dimensional vectors or binary data) may not be
|
||||
* practical to prewarm.
|
||||
*
|
||||
* This feature is currently only supported on remote tables.
|
||||
*/
|
||||
abstract prewarmData(columns?: string[]): Promise<void>;
|
||||
|
||||
/**
|
||||
* Waits for asynchronous indexing to complete on the table.
|
||||
*
|
||||
@@ -729,10 +710,6 @@ export class LocalTable extends Table {
|
||||
await this.inner.prewarmIndex(name);
|
||||
}
|
||||
|
||||
async prewarmData(columns?: string[]): Promise<void> {
|
||||
await this.inner.prewarmData(columns);
|
||||
}
|
||||
|
||||
async waitForIndex(
|
||||
indexNames: string[],
|
||||
timeoutSeconds: number,
|
||||
|
||||
@@ -75,6 +75,7 @@
|
||||
"build:debug": "napi build --platform --dts ../lancedb/native.d.ts --js ../lancedb/native.js --output-dir lancedb",
|
||||
"postbuild:debug": "shx mkdir -p dist && shx cp lancedb/*.node dist/",
|
||||
"build:release": "napi build --platform --release --dts ../lancedb/native.d.ts --js ../lancedb/native.js --output-dir dist",
|
||||
"postbuild:release": "shx mkdir -p dist && shx cp lancedb/*.node dist/",
|
||||
"build": "npm run build:debug && npm run tsc",
|
||||
"build-release": "npm run build:release && npm run tsc",
|
||||
"tsc": "tsc -b",
|
||||
|
||||
@@ -159,14 +159,6 @@ impl Table {
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn prewarm_data(&self, columns: Option<Vec<String>>) -> napi::Result<()> {
|
||||
self.inner_ref()?
|
||||
.prewarm_data(columns)
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn wait_for_index(&self, index_names: Vec<String>, timeout_s: i64) -> Result<()> {
|
||||
let timeout = std::time::Duration::from_secs(timeout_s.try_into().unwrap());
|
||||
|
||||
@@ -35,8 +35,7 @@ futures.workspace = true
|
||||
serde = "1"
|
||||
serde_json = "1"
|
||||
snafu.workspace = true
|
||||
tokio = { version = "1.40", features = ["sync", "rt-multi-thread"] }
|
||||
libc = "0.2"
|
||||
tokio = { version = "1.40", features = ["sync"] }
|
||||
|
||||
[build-dependencies]
|
||||
pyo3-build-config = { version = "0.28", features = [
|
||||
|
||||
@@ -7,6 +7,7 @@ import os
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from datetime import timedelta
|
||||
from typing import Dict, Optional, Union, Any, List
|
||||
import warnings
|
||||
|
||||
__version__ = importlib.metadata.version("lancedb")
|
||||
|
||||
@@ -437,3 +438,13 @@ __all__ = [
|
||||
"Table",
|
||||
"__version__",
|
||||
]
|
||||
|
||||
|
||||
def __warn_on_fork():
|
||||
warnings.warn(
|
||||
"lance is not fork-safe. If you are using multiprocessing, use spawn instead.",
|
||||
)
|
||||
|
||||
|
||||
if hasattr(os, "register_at_fork"):
|
||||
os.register_at_fork(before=__warn_on_fork) # type: ignore[attr-defined]
|
||||
|
||||
@@ -12,7 +12,6 @@ from .index import (
|
||||
LabelList,
|
||||
HnswPq,
|
||||
HnswSq,
|
||||
HnswFlat,
|
||||
FTS,
|
||||
)
|
||||
from lance_namespace import (
|
||||
@@ -26,7 +25,6 @@ from .remote import ClientConfig
|
||||
|
||||
IvfHnswPq: type[HnswPq] = HnswPq
|
||||
IvfHnswSq: type[HnswSq] = HnswSq
|
||||
IvfHnswFlat: type[HnswFlat] = HnswFlat
|
||||
|
||||
class PyExpr:
|
||||
"""A type-safe DataFusion expression node (Rust-side handle)."""
|
||||
@@ -182,7 +180,6 @@ class Table:
|
||||
IvfPq,
|
||||
HnswPq,
|
||||
HnswSq,
|
||||
HnswFlat,
|
||||
BTree,
|
||||
Bitmap,
|
||||
LabelList,
|
||||
@@ -445,7 +442,7 @@ class AsyncPermutationBuilder:
|
||||
async def execute(self) -> Table: ...
|
||||
|
||||
def async_permutation_builder(
|
||||
table: Table,
|
||||
table: Table, dest_table_name: str
|
||||
) -> AsyncPermutationBuilder: ...
|
||||
def fts_query_to_json(query: Any) -> str: ...
|
||||
|
||||
|
||||
@@ -2,9 +2,7 @@
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import threading
|
||||
import warnings
|
||||
|
||||
|
||||
class BackgroundEventLoop:
|
||||
@@ -15,9 +13,6 @@ class BackgroundEventLoop:
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._start()
|
||||
|
||||
def _start(self):
|
||||
self.loop = asyncio.new_event_loop()
|
||||
self.thread = threading.Thread(
|
||||
target=self.loop.run_forever,
|
||||
@@ -36,30 +31,3 @@ class BackgroundEventLoop:
|
||||
|
||||
|
||||
LOOP = BackgroundEventLoop()
|
||||
|
||||
_FORK_WARNED = False
|
||||
|
||||
|
||||
def _reset_after_fork():
|
||||
# Threads do not survive fork(), so the asyncio loop in LOOP.thread is
|
||||
# dead in the child. Re-initialize the singleton in place so existing
|
||||
# `from .background_loop import LOOP` references in other modules see
|
||||
# the new state. The Rust-side tokio runtime is reset analogously by a
|
||||
# pthread_atfork hook installed in the _lancedb extension.
|
||||
LOOP._start()
|
||||
global _FORK_WARNED
|
||||
if not _FORK_WARNED:
|
||||
_FORK_WARNED = True
|
||||
warnings.warn(
|
||||
"lancedb fork support is experimental: the internal async "
|
||||
"runtime has been reset in the forked child, but a small chance "
|
||||
"of deadlock remains if other state was mid-operation at fork "
|
||||
"time. The 'forkserver' or 'spawn' multiprocessing start method "
|
||||
"is likely a safer alternative.",
|
||||
RuntimeWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
if hasattr(os, "register_at_fork"):
|
||||
os.register_at_fork(after_in_child=_reset_after_fork)
|
||||
|
||||
@@ -79,7 +79,6 @@ class DBConnection(EnforceOverrides):
|
||||
namespace_path: List[str], default []
|
||||
The parent namespace to list namespaces in.
|
||||
Empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
page_token: str, optional
|
||||
Token for pagination. Use the token from a previous response
|
||||
to get the next page of results.
|
||||
@@ -107,7 +106,6 @@ class DBConnection(EnforceOverrides):
|
||||
----------
|
||||
namespace_path: List[str]
|
||||
The namespace identifier to create.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
mode: str, optional
|
||||
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
|
||||
or "overwrite" (replace if exists). Case insensitive.
|
||||
@@ -135,7 +133,6 @@ class DBConnection(EnforceOverrides):
|
||||
----------
|
||||
namespace_path: List[str]
|
||||
The namespace identifier to drop.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
mode: str, optional
|
||||
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
|
||||
behavior: str, optional
|
||||
@@ -160,7 +157,6 @@ class DBConnection(EnforceOverrides):
|
||||
----------
|
||||
namespace_path: List[str]
|
||||
The namespace identifier to describe.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -184,7 +180,6 @@ class DBConnection(EnforceOverrides):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to list tables in.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
page_token: str, optional
|
||||
Token for pagination. Use the token from a previous response
|
||||
to get the next page of results.
|
||||
@@ -215,7 +210,6 @@ class DBConnection(EnforceOverrides):
|
||||
namespace_path: List[str], default []
|
||||
The namespace to list tables in.
|
||||
Empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
page_token: str, optional
|
||||
The token to use for pagination. If not present, start from the beginning.
|
||||
Typically, this token is last table name from the previous page.
|
||||
@@ -254,7 +248,6 @@ class DBConnection(EnforceOverrides):
|
||||
namespace_path: List[str], default []
|
||||
The namespace to create the table in.
|
||||
Empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
data: The data to initialize the table, *optional*
|
||||
User must provide at least one of `data` or `schema`.
|
||||
Acceptable types are:
|
||||
@@ -423,7 +416,6 @@ class DBConnection(EnforceOverrides):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to open the table from.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
index_cache_size: int, default 256
|
||||
**Deprecated**: Use session-level cache configuration instead.
|
||||
Create a Session with custom cache sizes and pass it to lancedb.connect().
|
||||
@@ -459,7 +451,6 @@ class DBConnection(EnforceOverrides):
|
||||
namespace_path: List[str], default []
|
||||
The namespace to drop the table from.
|
||||
Empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
"""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
@@ -483,11 +474,9 @@ class DBConnection(EnforceOverrides):
|
||||
cur_namespace_path: List[str], optional
|
||||
The namespace of the current table.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``cur_namespace`` in 0.30.2 and earlier.
|
||||
new_namespace_path: List[str], optional
|
||||
The namespace to move the table to.
|
||||
If not specified, defaults to the same as cur_namespace_path.
|
||||
Previously called ``new_namespace`` in 0.30.2 and earlier.
|
||||
If not specified, defaults to the same as cur_namespace.
|
||||
"""
|
||||
if cur_namespace_path is None:
|
||||
cur_namespace_path = []
|
||||
@@ -511,7 +500,6 @@ class DBConnection(EnforceOverrides):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to drop all tables from.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
"""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
@@ -725,7 +713,6 @@ class LanceDBConnection(DBConnection):
|
||||
namespace_path: List[str], optional
|
||||
The parent namespace to list namespaces in.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
page_token: str, optional
|
||||
Token for pagination. Use the token from a previous response
|
||||
to get the next page of results.
|
||||
@@ -793,7 +780,6 @@ class LanceDBConnection(DBConnection):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to list tables in.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
page_token: str, optional
|
||||
Token for pagination. Use the token from a previous response
|
||||
to get the next page of results.
|
||||
@@ -836,7 +822,6 @@ class LanceDBConnection(DBConnection):
|
||||
----------
|
||||
namespace_path: List[str], optional
|
||||
The namespace to list tables in.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
page_token: str, optional
|
||||
The token to use for pagination.
|
||||
limit: int, default 10
|
||||
@@ -891,7 +876,6 @@ class LanceDBConnection(DBConnection):
|
||||
----------
|
||||
namespace_path: List[str], optional
|
||||
The namespace to create the table in.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
|
||||
See
|
||||
---
|
||||
@@ -965,7 +949,6 @@ class LanceDBConnection(DBConnection):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to open the table from. When non-empty, the
|
||||
table is resolved through the directory namespace client.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -1026,7 +1009,6 @@ class LanceDBConnection(DBConnection):
|
||||
target_namespace_path: List[str], optional
|
||||
The namespace for the target table.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``target_namespace`` in 0.30.2 and earlier.
|
||||
source_version: int, optional
|
||||
The version of the source table to clone.
|
||||
source_tag: str, optional
|
||||
@@ -1072,7 +1054,6 @@ class LanceDBConnection(DBConnection):
|
||||
The name of the table.
|
||||
namespace_path: List[str], optional
|
||||
The namespace to drop the table from.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
ignore_missing: bool, default False
|
||||
If True, ignore if the table does not exist.
|
||||
"""
|
||||
@@ -1111,10 +1092,8 @@ class LanceDBConnection(DBConnection):
|
||||
The new name of the table.
|
||||
cur_namespace_path: List[str], optional
|
||||
The namespace of the current table.
|
||||
Previously called ``cur_namespace`` in 0.30.2 and earlier.
|
||||
new_namespace_path: List[str], optional
|
||||
The namespace to move the table to.
|
||||
Previously called ``new_namespace`` in 0.30.2 and earlier.
|
||||
"""
|
||||
if cur_namespace_path is None:
|
||||
cur_namespace_path = []
|
||||
@@ -1237,7 +1216,6 @@ class AsyncConnection(object):
|
||||
namespace_path: List[str], optional
|
||||
The parent namespace to list namespaces in.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
page_token: str, optional
|
||||
The token to use for pagination. If not present, start from the beginning.
|
||||
limit: int, optional
|
||||
@@ -1267,7 +1245,6 @@ class AsyncConnection(object):
|
||||
----------
|
||||
namespace_path: List[str]
|
||||
The namespace identifier to create.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
mode: str, optional
|
||||
Creation mode - "create", "exist_ok", or "overwrite". Case insensitive.
|
||||
properties: Dict[str, str], optional
|
||||
@@ -1297,7 +1274,6 @@ class AsyncConnection(object):
|
||||
----------
|
||||
namespace_path: List[str]
|
||||
The namespace identifier to drop.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
mode: str, optional
|
||||
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
|
||||
behavior: str, optional
|
||||
@@ -1325,7 +1301,6 @@ class AsyncConnection(object):
|
||||
----------
|
||||
namespace_path: List[str]
|
||||
The namespace identifier to describe.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -1348,7 +1323,6 @@ class AsyncConnection(object):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to list tables in.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
page_token: str, optional
|
||||
Token for pagination. Use the token from a previous response
|
||||
to get the next page of results.
|
||||
@@ -1384,7 +1358,6 @@ class AsyncConnection(object):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to list tables in.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
start_after: str, optional
|
||||
If present, only return names that come lexicographically after the supplied
|
||||
value.
|
||||
@@ -1436,7 +1409,6 @@ class AsyncConnection(object):
|
||||
namespace_path: List[str], default []
|
||||
The namespace to create the table in.
|
||||
Empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
data: The data to initialize the table, *optional*
|
||||
User must provide at least one of `data` or `schema`.
|
||||
Acceptable types are:
|
||||
@@ -1660,7 +1632,6 @@ class AsyncConnection(object):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to open the table from.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
storage_options: dict, optional
|
||||
Additional options for the storage backend. Options already set on the
|
||||
connection will be inherited by the table, but can be overridden here.
|
||||
@@ -1730,7 +1701,6 @@ class AsyncConnection(object):
|
||||
target_namespace_path: List[str], optional
|
||||
The namespace for the target table.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``target_namespace`` in 0.30.2 and earlier.
|
||||
source_version: int, optional
|
||||
The version of the source table to clone.
|
||||
source_tag: str, optional
|
||||
@@ -1773,11 +1743,9 @@ class AsyncConnection(object):
|
||||
cur_namespace_path: List[str], optional
|
||||
The namespace of the current table.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``cur_namespace`` in 0.30.2 and earlier.
|
||||
new_namespace_path: List[str], optional
|
||||
The namespace to move the table to.
|
||||
If not specified, defaults to the same as cur_namespace_path.
|
||||
Previously called ``new_namespace`` in 0.30.2 and earlier.
|
||||
If not specified, defaults to the same as cur_namespace.
|
||||
"""
|
||||
if cur_namespace_path is None:
|
||||
cur_namespace_path = []
|
||||
@@ -1806,7 +1774,6 @@ class AsyncConnection(object):
|
||||
namespace_path: List[str], default []
|
||||
The namespace to drop the table from.
|
||||
Empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
ignore_missing: bool, default False
|
||||
If True, ignore if the table does not exist.
|
||||
"""
|
||||
@@ -1828,7 +1795,6 @@ class AsyncConnection(object):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to drop all tables from.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
"""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
|
||||
@@ -7,7 +7,6 @@ from typing import Literal, Optional
|
||||
from ._lancedb import (
|
||||
IndexConfig,
|
||||
)
|
||||
from .types import BaseTokenizerType
|
||||
|
||||
lang_mapping = {
|
||||
"ar": "Arabic",
|
||||
@@ -112,12 +111,8 @@ class FTS:
|
||||
- "simple": Splits text by whitespace and punctuation.
|
||||
- "whitespace": Split text by whitespace, but not punctuation.
|
||||
- "raw": No tokenization. The entire text is treated as a single token.
|
||||
- "ngram": N-gram tokenizer for substring-style matching.
|
||||
- "jieba/*": Jieba tokenizer loaded from Lance's language model home.
|
||||
- "lindera/*": Lindera tokenizer loaded from Lance's language model home.
|
||||
language : str, default "English"
|
||||
The language to use for stemming and stop-word removal. This is not the
|
||||
primary way to enable CJK tokenization.
|
||||
The language to use for tokenization.
|
||||
max_token_length : int, default 40
|
||||
The maximum token length to index. Tokens longer than this length will be
|
||||
ignored.
|
||||
@@ -132,17 +127,10 @@ class FTS:
|
||||
ascii_folding : bool, default True
|
||||
Whether to fold ASCII characters. This converts accented characters to
|
||||
their ASCII equivalent. For example, "café" would be converted to "cafe".
|
||||
|
||||
Notes
|
||||
-----
|
||||
Model-backed tokenizers such as ``jieba/default`` and ``lindera/ipadic``
|
||||
require tokenizer models in Lance's language model home. Set
|
||||
``LANCE_LANGUAGE_MODEL_HOME`` to override the default platform data
|
||||
directory under ``lance/language_models``.
|
||||
"""
|
||||
|
||||
with_position: bool = False
|
||||
base_tokenizer: BaseTokenizerType = "simple"
|
||||
base_tokenizer: Literal["simple", "raw", "whitespace"] = "simple"
|
||||
language: str = "English"
|
||||
max_token_length: Optional[int] = 40
|
||||
lower_case: bool = True
|
||||
@@ -388,98 +376,9 @@ class HnswSq:
|
||||
target_partition_size: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class HnswFlat:
|
||||
"""Describe a HNSW-FLAT index configuration.
|
||||
|
||||
HNSW-FLAT stands for Hierarchical Navigable Small World without quantization.
|
||||
It stores raw vectors in the HNSW graph, providing the highest recall among
|
||||
the IVF_HNSW family at the cost of more memory and disk space compared to
|
||||
:class:`HnswSq` or :class:`HnswPq`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
||||
distance_type: str, default "l2"
|
||||
|
||||
The distance metric used to train the index.
|
||||
|
||||
The following distance types are available:
|
||||
|
||||
"l2" - Euclidean distance. This is a very common distance metric that
|
||||
accounts for both magnitude and direction when determining the distance
|
||||
between vectors. l2 distance has a range of [0, ∞).
|
||||
|
||||
"cosine" - Cosine distance. Cosine distance is a distance metric
|
||||
calculated from the cosine similarity between two vectors. Cosine
|
||||
similarity is a measure of similarity between two non-zero vectors of an
|
||||
inner product space. It is defined to equal the cosine of the angle
|
||||
between them. Unlike l2, the cosine distance is not affected by the
|
||||
magnitude of the vectors. Cosine distance has a range of [0, 2].
|
||||
|
||||
"dot" - Dot product. Dot distance is the dot product of two vectors. Dot
|
||||
distance has a range of (-∞, ∞). If the vectors are normalized (i.e. their
|
||||
l2 norm is 1), then dot distance is equivalent to the cosine distance.
|
||||
|
||||
num_partitions, default sqrt(num_rows)
|
||||
|
||||
The number of IVF partitions to create.
|
||||
|
||||
For HNSW, we recommend a small number of partitions. Setting this to 1
|
||||
works well for most tables. For very large tables, training just one HNSW
|
||||
graph will require too much memory. Each partition becomes its own HNSW
|
||||
graph, so setting this value higher reduces the peak memory use of
|
||||
training.
|
||||
|
||||
max_iterations, default 50
|
||||
|
||||
Max iterations to train kmeans.
|
||||
|
||||
When training an IVF index we use kmeans to calculate the partitions.
|
||||
This parameter controls how many iterations of kmeans to run.
|
||||
|
||||
sample_rate, default 256
|
||||
|
||||
The rate used to calculate the number of training vectors for kmeans.
|
||||
|
||||
m, default 20
|
||||
|
||||
The number of neighbors to select for each vector in the HNSW graph.
|
||||
|
||||
This value controls the tradeoff between search speed and accuracy.
|
||||
The higher the value the more accurate the search but the slower it
|
||||
will be.
|
||||
|
||||
ef_construction, default 300
|
||||
|
||||
The number of candidates to evaluate during the construction of the HNSW
|
||||
graph.
|
||||
|
||||
This value controls the tradeoff between build speed and accuracy.
|
||||
The higher the value the more accurate the build but the slower it will
|
||||
be. 150 to 300 is the typical range. 100 is a minimum for good quality
|
||||
search results. In most cases, there is no benefit to setting this higher
|
||||
than 500. This value should be set to a value that is not less than `ef`
|
||||
in the search phase.
|
||||
|
||||
target_partition_size, default is 1,048,576
|
||||
|
||||
The target size of each partition.
|
||||
"""
|
||||
|
||||
distance_type: Literal["l2", "cosine", "dot"] = "l2"
|
||||
num_partitions: Optional[int] = None
|
||||
max_iterations: int = 50
|
||||
sample_rate: int = 256
|
||||
m: int = 20
|
||||
ef_construction: int = 300
|
||||
target_partition_size: Optional[int] = None
|
||||
|
||||
|
||||
# Backwards-compatible aliases
|
||||
IvfHnswPq = HnswPq
|
||||
IvfHnswSq = HnswSq
|
||||
IvfHnswFlat = HnswFlat
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -799,13 +698,11 @@ __all__ = [
|
||||
"IvfPq",
|
||||
"IvfHnswPq",
|
||||
"IvfHnswSq",
|
||||
"IvfHnswFlat",
|
||||
"IvfSq",
|
||||
"IvfRq",
|
||||
"IvfFlat",
|
||||
"HnswPq",
|
||||
"HnswSq",
|
||||
"HnswFlat",
|
||||
"IndexConfig",
|
||||
"FTS",
|
||||
"Bitmap",
|
||||
|
||||
@@ -201,8 +201,7 @@ def _execute_server_side_query(
|
||||
Parameters
|
||||
----------
|
||||
namespace_client : LanceNamespace
|
||||
The namespace client to use.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
The namespace client to use
|
||||
table_id : List[str]
|
||||
The table identifier (namespace path + table name)
|
||||
query : Query
|
||||
@@ -389,8 +388,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
Parameters
|
||||
----------
|
||||
namespace_client : LanceNamespace
|
||||
The namespace client to use for table management.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
The namespace client to use for table management
|
||||
read_consistency_interval : Optional[timedelta]
|
||||
The interval at which to check for updates to the table from other
|
||||
processes. If None, then consistency is not checked.
|
||||
@@ -626,8 +624,6 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
namespace_path : Optional[List[str]]
|
||||
The parent namespace path to list children from.
|
||||
If None, lists root-level namespaces.
|
||||
|
||||
*Changed in version 0.31.0: renamed from* ``namespace``.
|
||||
page_token : Optional[str]
|
||||
Token for pagination. Use the token from a previous response
|
||||
to get the next page of results.
|
||||
@@ -864,8 +860,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
Parameters
|
||||
----------
|
||||
namespace_client : LanceNamespace
|
||||
The namespace client to use for table management.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
The namespace client to use for table management
|
||||
read_consistency_interval : Optional[timedelta]
|
||||
The interval at which to check for updates to the table from other
|
||||
processes. If None, then consistency is not checked.
|
||||
@@ -1048,8 +1043,6 @@ class AsyncLanceNamespaceDBConnection:
|
||||
namespace_path : Optional[List[str]]
|
||||
The parent namespace path to list children from.
|
||||
If None, lists root-level namespaces.
|
||||
|
||||
*Changed in version 0.31.0: renamed from* ``namespace``.
|
||||
page_token : Optional[str]
|
||||
Token for pagination. Use the token from a previous response
|
||||
to get the next page of results.
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
import copy
|
||||
import json
|
||||
|
||||
from deprecation import deprecated
|
||||
from lancedb import AsyncConnection, DBConnection
|
||||
import pyarrow as pa
|
||||
import json
|
||||
|
||||
from ._lancedb import async_permutation_builder, PermutationReader
|
||||
from .table import LanceTable
|
||||
@@ -37,7 +36,10 @@ class PermutationBuilder:
|
||||
be referenced by name in the future. If names are not provided then they can only
|
||||
be referenced by their ordinal index. There is no requirement to name every split.
|
||||
|
||||
The permutation is stored in memory and will be lost when the program exits.
|
||||
By default, the permutation will be stored in memory and will be lost when the
|
||||
program exits. To persist the permutation (for very large datasets or to share
|
||||
the permutation across multiple workers) use the [persist](#persist) method to
|
||||
create a permanent table.
|
||||
"""
|
||||
|
||||
def __init__(self, table: LanceTable):
|
||||
@@ -49,6 +51,15 @@ class PermutationBuilder:
|
||||
"""
|
||||
self._async = async_permutation_builder(table)
|
||||
|
||||
def persist(
|
||||
self, database: Union[DBConnection, AsyncConnection], table_name: str
|
||||
) -> "PermutationBuilder":
|
||||
"""
|
||||
Persist the permutation to the given database.
|
||||
"""
|
||||
self._async.persist(database, table_name)
|
||||
return self
|
||||
|
||||
def split_random(
|
||||
self,
|
||||
*,
|
||||
@@ -369,44 +380,20 @@ class Permutation:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_table: LanceTable,
|
||||
permutation_table: Optional[LanceTable],
|
||||
split: int,
|
||||
reader: PermutationReader,
|
||||
selection: dict[str, str],
|
||||
batch_size: int,
|
||||
transform_fn: Callable[pa.RecordBatch, Any],
|
||||
offset: Optional[int] = None,
|
||||
limit: Optional[int] = None,
|
||||
connection_factory: Optional[Callable[[str], LanceTable]] = None,
|
||||
_reader: Optional[PermutationReader] = None,
|
||||
):
|
||||
"""
|
||||
Internal constructor. Use [from_tables](#from_tables) instead.
|
||||
"""
|
||||
assert base_table is not None, "base_table is required"
|
||||
assert reader is not None, "reader is required"
|
||||
assert selection is not None, "selection is required"
|
||||
self.base_table = base_table
|
||||
self.permutation_table = permutation_table
|
||||
self.split = split
|
||||
self.reader = reader
|
||||
self.selection = selection
|
||||
self.transform_fn = transform_fn
|
||||
self.batch_size = batch_size
|
||||
self.offset = offset
|
||||
self.limit = limit
|
||||
self.connection_factory = connection_factory
|
||||
if _reader is None:
|
||||
_reader = LOOP.run(self._build_reader())
|
||||
self.reader: PermutationReader = _reader
|
||||
|
||||
async def _build_reader(self) -> PermutationReader:
|
||||
reader = await PermutationReader.from_tables(
|
||||
self.base_table, self.permutation_table, self.split
|
||||
)
|
||||
if self.offset is not None:
|
||||
reader = await reader.with_offset(self.offset)
|
||||
if self.limit is not None:
|
||||
reader = await reader.with_limit(self.limit)
|
||||
return reader
|
||||
|
||||
def _with_selection(self, selection: dict[str, str]) -> "Permutation":
|
||||
"""
|
||||
@@ -415,97 +402,21 @@ class Permutation:
|
||||
Does not validation of the selection and it replaces it entirely. This is not
|
||||
intended for public use.
|
||||
"""
|
||||
new = copy.copy(self)
|
||||
new.selection = selection
|
||||
return new
|
||||
return Permutation(self.reader, selection, self.batch_size, self.transform_fn)
|
||||
|
||||
def _with_reader(self, reader: PermutationReader) -> "Permutation":
|
||||
"""
|
||||
Creates a new permutation with the given reader
|
||||
|
||||
This is an internal method and should not be used directly.
|
||||
"""
|
||||
return Permutation(reader, self.selection, self.batch_size, self.transform_fn)
|
||||
|
||||
def with_batch_size(self, batch_size: int) -> "Permutation":
|
||||
"""
|
||||
Creates a new permutation with the given batch size
|
||||
"""
|
||||
new = copy.copy(self)
|
||||
new.batch_size = batch_size
|
||||
return new
|
||||
|
||||
def with_connection_factory(
|
||||
self, connection_factory: Callable[[str], LanceTable]
|
||||
) -> "Permutation":
|
||||
"""
|
||||
Creates a new permutation that will use ``connection_factory`` to reopen
|
||||
the base table when this permutation is unpickled in a worker process.
|
||||
|
||||
The factory is a callable that takes a single argument — the base table
|
||||
name — and returns a [LanceTable]. It must be picklable; the worker
|
||||
will pickle it via standard ``pickle`` and call it to recover the base
|
||||
table. Picklable callables in practice means top-level (module-level)
|
||||
functions, ``functools.partial`` of such functions, or instances of
|
||||
picklable classes implementing ``__call__``. Lambdas and closures over
|
||||
local variables don't pickle with the default protocol.
|
||||
|
||||
Setting a factory is necessary when the URI alone is not enough to
|
||||
re-open the connection — most importantly for LanceDB Cloud (``db://``)
|
||||
connections, where ``api_key`` and ``region`` aren't recoverable from
|
||||
the connection object after construction.
|
||||
|
||||
For local file or cloud-storage paths the factory is optional: if not
|
||||
set, ``__getstate__`` falls back to capturing
|
||||
``(uri, storage_options, namespace_path)`` and re-opening via
|
||||
``lancedb.connect(uri, storage_options=...)``.
|
||||
|
||||
Examples
|
||||
--------
|
||||
Basic native (file-system path), parameterized via ``functools.partial``::
|
||||
|
||||
import functools, lancedb
|
||||
from lancedb.permutation import Permutation
|
||||
|
||||
def open_native_table(uri: str, table_name: str):
|
||||
return lancedb.connect(uri).open_table(table_name)
|
||||
|
||||
factory = functools.partial(open_native_table, "/data/lance_db")
|
||||
permutation = Permutation.identity(
|
||||
factory("training")
|
||||
).with_connection_factory(factory)
|
||||
|
||||
Native via :func:`lancedb.connect_namespace` (e.g. a directory- or
|
||||
REST-backed namespace client). The factory takes the
|
||||
implementation name and properties dict as partial-bound args so
|
||||
the worker can rebuild the same namespace connection::
|
||||
|
||||
def open_via_namespace(
|
||||
impl: str, properties: dict[str, str], table_name: str,
|
||||
):
|
||||
return lancedb.connect_namespace(impl, properties).open_table(
|
||||
table_name,
|
||||
)
|
||||
|
||||
factory = functools.partial(
|
||||
open_via_namespace,
|
||||
"dir",
|
||||
{"root": "/data/lance_db"},
|
||||
)
|
||||
|
||||
LanceDB Cloud, reading credentials from env vars at worker startup
|
||||
so secrets aren't pickled into the dataset::
|
||||
|
||||
import os, lancedb
|
||||
|
||||
def open_remote_table(table_name: str):
|
||||
db = lancedb.connect(
|
||||
"db://my-database",
|
||||
api_key=os.environ["LANCEDB_API_KEY"],
|
||||
region=os.environ.get("LANCEDB_REGION", "us-east-1"),
|
||||
)
|
||||
return db.open_table(table_name)
|
||||
|
||||
permutation = Permutation.identity(
|
||||
open_remote_table("training")
|
||||
).with_connection_factory(open_remote_table)
|
||||
"""
|
||||
assert connection_factory is not None, "connection_factory is required"
|
||||
new = copy.copy(self)
|
||||
new.connection_factory = connection_factory
|
||||
return new
|
||||
return Permutation(self.reader, self.selection, batch_size, self.transform_fn)
|
||||
|
||||
@classmethod
|
||||
def identity(cls, table: LanceTable) -> "Permutation":
|
||||
@@ -578,126 +489,11 @@ class Permutation:
|
||||
schema = await reader.output_schema(None)
|
||||
initial_selection = {name: name for name in schema.names}
|
||||
return cls(
|
||||
base_table,
|
||||
permutation_table,
|
||||
split,
|
||||
initial_selection,
|
||||
DEFAULT_BATCH_SIZE,
|
||||
Transforms.arrow2python,
|
||||
_reader=reader,
|
||||
reader, initial_selection, DEFAULT_BATCH_SIZE, Transforms.arrow2python
|
||||
)
|
||||
|
||||
return LOOP.run(do_from_tables())
|
||||
|
||||
def __getstate__(self) -> dict[str, Any]:
|
||||
"""Build a picklable state dict for this permutation.
|
||||
|
||||
The base table is captured either via a user-supplied
|
||||
``connection_factory`` (see [with_connection_factory]) or, as a
|
||||
fallback, by introspecting ``(uri, storage_options, namespace_path)``
|
||||
on the connection. The permutation table — always an in-memory
|
||||
LanceDB table — is captured as a pyarrow Table (which pickles via
|
||||
Arrow IPC natively). The reader is dropped from the wire format;
|
||||
``__setstate__`` rebuilds it from the restored tables.
|
||||
"""
|
||||
permutation_data: Optional[pa.Table] = None
|
||||
if self.permutation_table is not None:
|
||||
permutation_data = self.permutation_table.to_arrow()
|
||||
|
||||
common = {
|
||||
"base_table_name": self.base_table.name,
|
||||
"permutation_data": permutation_data,
|
||||
"split": self.split,
|
||||
"selection": self.selection,
|
||||
"batch_size": self.batch_size,
|
||||
"transform_fn": self.transform_fn,
|
||||
"offset": self.offset,
|
||||
"limit": self.limit,
|
||||
"connection_factory": self.connection_factory,
|
||||
}
|
||||
|
||||
if self.connection_factory is not None:
|
||||
# The factory carries enough state to recover the base table on
|
||||
# its own; we don't need to capture the URI / storage options /
|
||||
# namespace from the existing connection.
|
||||
return common
|
||||
|
||||
# URI-introspection fallback: only viable for native (OSS) connections
|
||||
# where (uri, storage_options) is enough to reopen. Remote / cloud
|
||||
# connections don't expose recoverable api_key / region — those users
|
||||
# must call with_connection_factory().
|
||||
try:
|
||||
base_uri = self.base_table._conn.uri
|
||||
storage_options = self.base_table._conn.storage_options
|
||||
except AttributeError as e:
|
||||
raise ValueError(
|
||||
"Cannot pickle this Permutation: the base table's connection "
|
||||
"does not expose a uri/storage_options, which usually means it "
|
||||
"is a remote (LanceDB Cloud) connection. Call "
|
||||
"Permutation.with_connection_factory(...) first to provide a "
|
||||
"picklable callable that re-opens the base table from a worker "
|
||||
"process."
|
||||
) from e
|
||||
|
||||
if base_uri.startswith("memory://"):
|
||||
# In-memory base tables don't exist in any worker process by
|
||||
# default, so dump the entire base table into the pickle. This
|
||||
# can be expensive for large datasets — users with large
|
||||
# in-memory base tables should either persist them or set a
|
||||
# connection_factory.
|
||||
return {
|
||||
**common,
|
||||
"base_table_data": self.base_table.to_arrow(),
|
||||
}
|
||||
|
||||
return {
|
||||
**common,
|
||||
"base_table_uri": base_uri,
|
||||
"base_table_namespace": self.base_table._namespace_path,
|
||||
"base_table_storage_options": storage_options,
|
||||
}
|
||||
|
||||
def __setstate__(self, state: dict[str, Any]) -> None:
|
||||
from . import connect
|
||||
|
||||
connection_factory = state["connection_factory"]
|
||||
if connection_factory is not None:
|
||||
base_table = connection_factory(state["base_table_name"])
|
||||
elif "base_table_data" in state:
|
||||
# In-memory base table inlined into the pickle; rebuild the same
|
||||
# way we rebuild the in-memory permutation table.
|
||||
mem_db = connect("memory://")
|
||||
base_table = mem_db.create_table(
|
||||
state["base_table_name"], state["base_table_data"]
|
||||
)
|
||||
else:
|
||||
base_db = connect(
|
||||
state["base_table_uri"],
|
||||
storage_options=state["base_table_storage_options"],
|
||||
)
|
||||
base_table = base_db.open_table(
|
||||
state["base_table_name"],
|
||||
namespace_path=state["base_table_namespace"] or None,
|
||||
)
|
||||
|
||||
permutation_table: Optional[LanceTable] = None
|
||||
if state["permutation_data"] is not None:
|
||||
mem_db = connect("memory://")
|
||||
permutation_table = mem_db.create_table(
|
||||
"permutation", state["permutation_data"]
|
||||
)
|
||||
|
||||
self.base_table = base_table
|
||||
self.permutation_table = permutation_table
|
||||
self.split = state["split"]
|
||||
self.selection = state["selection"]
|
||||
self.batch_size = state["batch_size"]
|
||||
self.transform_fn = state["transform_fn"]
|
||||
self.offset = state["offset"]
|
||||
self.limit = state["limit"]
|
||||
self.connection_factory = connection_factory
|
||||
self.reader = LOOP.run(self._build_reader())
|
||||
|
||||
@property
|
||||
def schema(self) -> pa.Schema:
|
||||
async def do_output_schema():
|
||||
@@ -964,9 +760,7 @@ class Permutation:
|
||||
for expensive operations such as image decoding.
|
||||
"""
|
||||
assert transform is not None, "transform is required"
|
||||
new = copy.copy(self)
|
||||
new.transform_fn = transform
|
||||
return new
|
||||
return Permutation(self.reader, self.selection, self.batch_size, transform)
|
||||
|
||||
def __getitem__(self, index: int) -> Any:
|
||||
"""
|
||||
@@ -1001,10 +795,12 @@ class Permutation:
|
||||
"""
|
||||
Skip the first `skip` rows of the permutation
|
||||
"""
|
||||
new = copy.copy(self)
|
||||
new.offset = skip
|
||||
new.reader = LOOP.run(new._build_reader())
|
||||
return new
|
||||
|
||||
async def do_with_skip():
|
||||
reader = await self.reader.with_offset(skip)
|
||||
return self._with_reader(reader)
|
||||
|
||||
return LOOP.run(do_with_skip())
|
||||
|
||||
@deprecated(details="Use with_take instead")
|
||||
def take(self, limit: int) -> "Permutation":
|
||||
@@ -1022,10 +818,12 @@ class Permutation:
|
||||
"""
|
||||
Limit the permutation to `limit` rows (following any `skip`)
|
||||
"""
|
||||
new = copy.copy(self)
|
||||
new.limit = limit
|
||||
new.reader = LOOP.run(new._build_reader())
|
||||
return new
|
||||
|
||||
async def do_with_take():
|
||||
reader = await self.reader.with_limit(limit)
|
||||
return self._with_reader(reader)
|
||||
|
||||
return LOOP.run(do_with_take())
|
||||
|
||||
@deprecated(details="Use with_repeat instead")
|
||||
def repeat(self, times: int) -> "Permutation":
|
||||
|
||||
@@ -123,7 +123,6 @@ class RemoteDBConnection(DBConnection):
|
||||
namespace_path: List[str], optional
|
||||
The parent namespace to list namespaces in.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
page_token: str, optional
|
||||
Token for pagination. Use the token from a previous response
|
||||
to get the next page of results.
|
||||
@@ -156,7 +155,6 @@ class RemoteDBConnection(DBConnection):
|
||||
----------
|
||||
namespace_path: List[str]
|
||||
The namespace identifier to create.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
mode: str, optional
|
||||
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
|
||||
or "overwrite" (replace if exists). Case insensitive.
|
||||
@@ -187,7 +185,6 @@ class RemoteDBConnection(DBConnection):
|
||||
----------
|
||||
namespace_path: List[str]
|
||||
The namespace identifier to drop.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
mode: str, optional
|
||||
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
|
||||
behavior: str, optional
|
||||
@@ -215,7 +212,6 @@ class RemoteDBConnection(DBConnection):
|
||||
----------
|
||||
namespace_path: List[str]
|
||||
The namespace identifier to describe.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -238,7 +234,6 @@ class RemoteDBConnection(DBConnection):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to list tables in.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
page_token: str, optional
|
||||
Token for pagination. Use the token from a previous response
|
||||
to get the next page of results.
|
||||
@@ -276,7 +271,6 @@ class RemoteDBConnection(DBConnection):
|
||||
namespace_path: List[str], default []
|
||||
The namespace to list tables in.
|
||||
Empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
page_token: str
|
||||
The last token to start the new page.
|
||||
limit: int, default 10
|
||||
@@ -319,7 +313,6 @@ class RemoteDBConnection(DBConnection):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to open the table from.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -359,7 +352,6 @@ class RemoteDBConnection(DBConnection):
|
||||
target_namespace_path: List[str], optional
|
||||
The namespace for the target table.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``target_namespace`` in 0.30.2 and earlier.
|
||||
source_version: int, optional
|
||||
The version of the source table to clone.
|
||||
source_tag: str, optional
|
||||
@@ -411,7 +403,6 @@ class RemoteDBConnection(DBConnection):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to create the table in.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
data: The data to initialize the table, *optional*
|
||||
User must provide at least one of `data` or `schema`.
|
||||
Acceptable types are:
|
||||
@@ -545,7 +536,6 @@ class RemoteDBConnection(DBConnection):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to drop the table from.
|
||||
None or empty list represents root namespace.
|
||||
Previously called ``namespace`` in 0.30.2 and earlier.
|
||||
"""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
@@ -567,12 +557,6 @@ class RemoteDBConnection(DBConnection):
|
||||
The current name of the table.
|
||||
new_name: str
|
||||
The new name of the table.
|
||||
cur_namespace_path: List[str], optional
|
||||
The namespace of the current table.
|
||||
Previously called ``cur_namespace`` in 0.30.2 and earlier.
|
||||
new_namespace_path: List[str], optional
|
||||
The namespace to move the table to.
|
||||
Previously called ``new_namespace`` in 0.30.2 and earlier.
|
||||
"""
|
||||
if cur_namespace_path is None:
|
||||
cur_namespace_path = []
|
||||
|
||||
@@ -22,7 +22,6 @@ from lancedb.index import (
|
||||
FTS,
|
||||
BTree,
|
||||
Bitmap,
|
||||
HnswFlat,
|
||||
HnswSq,
|
||||
IvfFlat,
|
||||
IvfPq,
|
||||
@@ -40,7 +39,6 @@ from lancedb.table import _normalize_progress
|
||||
|
||||
from ..query import LanceVectorQueryBuilder, LanceQueryBuilder, LanceTakeQueryBuilder
|
||||
from ..table import AsyncTable, IndexStatistics, Query, Table, Tags
|
||||
from ..types import BaseTokenizerType
|
||||
|
||||
|
||||
class RemoteTable(Table):
|
||||
@@ -169,7 +167,7 @@ class RemoteTable(Table):
|
||||
wait_timeout: Optional[timedelta] = None,
|
||||
with_position: bool = False,
|
||||
# tokenizer configs:
|
||||
base_tokenizer: BaseTokenizerType = "simple",
|
||||
base_tokenizer: str = "simple",
|
||||
language: str = "English",
|
||||
max_token_length: Optional[int] = 40,
|
||||
lower_case: bool = True,
|
||||
@@ -286,15 +284,13 @@ class RemoteTable(Table):
|
||||
)
|
||||
elif index_type == "IVF_HNSW_SQ":
|
||||
config = HnswSq(distance_type=metric, num_partitions=num_partitions)
|
||||
elif index_type == "IVF_HNSW_FLAT":
|
||||
config = HnswFlat(distance_type=metric, num_partitions=num_partitions)
|
||||
elif index_type == "IVF_FLAT":
|
||||
config = IvfFlat(distance_type=metric, num_partitions=num_partitions)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Unknown vector index type: {index_type}. Valid options are"
|
||||
" 'IVF_FLAT', 'IVF_PQ', 'IVF_RQ', 'IVF_SQ',"
|
||||
" 'IVF_HNSW_PQ', 'IVF_HNSW_SQ', 'IVF_HNSW_FLAT'"
|
||||
" 'IVF_HNSW_PQ', 'IVF_HNSW_SQ'"
|
||||
)
|
||||
|
||||
LOOP.run(
|
||||
|
||||
@@ -57,7 +57,6 @@ from .index import (
|
||||
LabelList,
|
||||
HnswPq,
|
||||
HnswSq,
|
||||
HnswFlat,
|
||||
FTS,
|
||||
)
|
||||
from .merge import LanceMergeInsertBuilder
|
||||
@@ -87,59 +86,6 @@ from .util import (
|
||||
)
|
||||
from .index import lang_mapping
|
||||
|
||||
_MODEL_BACKED_TOKENIZER_PREFIXES = ("jieba", "lindera")
|
||||
_MODEL_BACKED_TOKENIZER_ERRORS = (
|
||||
"unknown base tokenizer",
|
||||
"Invalid directory path:",
|
||||
"Failed to load Jieba",
|
||||
"Failed to load tokenizer config",
|
||||
"Failed to initialize default tokenizer",
|
||||
)
|
||||
|
||||
|
||||
def _add_unique_note(exception: BaseException, note: str) -> None:
|
||||
existing_notes = getattr(exception, "__notes__", ()) or ()
|
||||
message = (
|
||||
exception.args[0]
|
||||
if exception.args and isinstance(exception.args[0], str)
|
||||
else ""
|
||||
)
|
||||
if note not in existing_notes and note not in message:
|
||||
add_note(exception, note)
|
||||
|
||||
|
||||
def _is_model_backed_tokenizer(base_tokenizer: str) -> bool:
|
||||
return any(
|
||||
base_tokenizer == prefix or base_tokenizer.startswith(f"{prefix}/")
|
||||
for prefix in _MODEL_BACKED_TOKENIZER_PREFIXES
|
||||
)
|
||||
|
||||
|
||||
def _maybe_add_fts_error_note(
|
||||
exception: BaseException, *, base_tokenizer: str, language: Optional[str] = None
|
||||
) -> None:
|
||||
message = str(exception)
|
||||
if language is not None and "not support the requested language" in message:
|
||||
supported_langs = ", ".join(lang_mapping.values())
|
||||
_add_unique_note(exception, f"Supported languages: {supported_langs}")
|
||||
return
|
||||
|
||||
if not _is_model_backed_tokenizer(base_tokenizer):
|
||||
return
|
||||
|
||||
if not any(marker in message for marker in _MODEL_BACKED_TOKENIZER_ERRORS):
|
||||
return
|
||||
|
||||
_add_unique_note(
|
||||
exception,
|
||||
"Model-backed tokenizers such as 'jieba/default' and 'lindera/ipadic' "
|
||||
"require tokenizer models in Lance's language model home. Set "
|
||||
"LANCE_LANGUAGE_MODEL_HOME to override the default platform data "
|
||||
"directory under 'lance/language_models'. Expected layouts include "
|
||||
"'<model-home>/jieba/default/...' and "
|
||||
"'<model-home>/lindera/ipadic/...'.",
|
||||
)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .db import LanceDBConnection
|
||||
@@ -1012,10 +958,7 @@ class Table(ABC):
|
||||
tokenizer_name: str, default "default"
|
||||
A compatibility alias for native tokenizer configs. Can be "raw",
|
||||
"default" or the 2 letter language code followed by "_stem". So
|
||||
for english it would be "en_stem". For new native FTS indexes, use
|
||||
``base_tokenizer`` directly; ``tokenizer_name`` is a legacy
|
||||
compatibility alias and does not expose model-backed tokenizer names
|
||||
such as ``jieba/default`` or ``lindera/ipadic``.
|
||||
for english it would be "en_stem".
|
||||
use_tantivy: bool, default False
|
||||
Deprecated legacy Tantivy parameter. Setting this to True raises an
|
||||
error.
|
||||
@@ -1029,11 +972,8 @@ class Table(ABC):
|
||||
- "whitespace": Split text by whitespace, but not punctuation.
|
||||
- "raw": No tokenization. The entire text is treated as a single token.
|
||||
- "ngram": N-Gram tokenizer.
|
||||
- "jieba/*": Jieba tokenizer loaded from Lance's language model home.
|
||||
- "lindera/*": Lindera tokenizer loaded from Lance's language model home.
|
||||
language : str, default "English"
|
||||
The language to use for stemming and stop-word removal. This is not
|
||||
the primary way to enable CJK tokenization.
|
||||
The language to use for tokenization.
|
||||
max_token_length : int, default 40
|
||||
The maximum token length to index. Tokens longer than this length will be
|
||||
ignored.
|
||||
@@ -1059,13 +999,6 @@ class Table(ABC):
|
||||
The timeout to wait if indexing is asynchronous.
|
||||
name: str, optional
|
||||
The name of the index. If not provided, a default name will be generated.
|
||||
|
||||
Notes
|
||||
-----
|
||||
Model-backed tokenizers such as ``jieba/default`` and ``lindera/ipadic``
|
||||
require tokenizer models in Lance's language model home. Set
|
||||
``LANCE_LANGUAGE_MODEL_HOME`` to override the default platform data
|
||||
directory under ``lance/language_models``.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -2237,13 +2170,7 @@ class LanceTable(Table):
|
||||
index_cache_size: Optional[int] = None,
|
||||
num_bits: int = 8,
|
||||
index_type: Literal[
|
||||
"IVF_FLAT",
|
||||
"IVF_SQ",
|
||||
"IVF_PQ",
|
||||
"IVF_RQ",
|
||||
"IVF_HNSW_SQ",
|
||||
"IVF_HNSW_PQ",
|
||||
"IVF_HNSW_FLAT",
|
||||
"IVF_FLAT", "IVF_SQ", "IVF_PQ", "IVF_RQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"
|
||||
] = "IVF_PQ",
|
||||
max_iterations: int = 50,
|
||||
sample_rate: int = 256,
|
||||
@@ -2330,16 +2257,6 @@ class LanceTable(Table):
|
||||
ef_construction=ef_construction,
|
||||
target_partition_size=target_partition_size,
|
||||
)
|
||||
elif index_type == "IVF_HNSW_FLAT":
|
||||
config = HnswFlat(
|
||||
distance_type=metric,
|
||||
num_partitions=num_partitions,
|
||||
max_iterations=max_iterations,
|
||||
sample_rate=sample_rate,
|
||||
m=m,
|
||||
ef_construction=ef_construction,
|
||||
target_partition_size=target_partition_size,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unknown index type {index_type}")
|
||||
|
||||
@@ -2545,22 +2462,14 @@ class LanceTable(Table):
|
||||
**tokenizer_configs,
|
||||
)
|
||||
|
||||
try:
|
||||
LOOP.run(
|
||||
self._table.create_index(
|
||||
field_names,
|
||||
replace=replace,
|
||||
config=config,
|
||||
name=name,
|
||||
)
|
||||
LOOP.run(
|
||||
self._table.create_index(
|
||||
field_names,
|
||||
replace=replace,
|
||||
config=config,
|
||||
name=name,
|
||||
)
|
||||
except (ValueError, RuntimeError) as e:
|
||||
_maybe_add_fts_error_note(
|
||||
e,
|
||||
base_tokenizer=config.base_tokenizer,
|
||||
language=config.language,
|
||||
)
|
||||
raise e
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def infer_tokenizer_configs(tokenizer_name: str) -> dict:
|
||||
@@ -3890,18 +3799,7 @@ class AsyncTable:
|
||||
*,
|
||||
replace: Optional[bool] = None,
|
||||
config: Optional[
|
||||
Union[
|
||||
IvfFlat,
|
||||
IvfPq,
|
||||
IvfRq,
|
||||
HnswPq,
|
||||
HnswSq,
|
||||
HnswFlat,
|
||||
BTree,
|
||||
Bitmap,
|
||||
LabelList,
|
||||
FTS,
|
||||
]
|
||||
Union[IvfFlat, IvfPq, IvfRq, HnswPq, HnswSq, BTree, Bitmap, LabelList, FTS]
|
||||
] = None,
|
||||
wait_timeout: Optional[timedelta] = None,
|
||||
name: Optional[str] = None,
|
||||
@@ -3948,7 +3846,6 @@ class AsyncTable:
|
||||
IvfRq,
|
||||
HnswPq,
|
||||
HnswSq,
|
||||
HnswFlat,
|
||||
BTree,
|
||||
Bitmap,
|
||||
LabelList,
|
||||
@@ -3968,13 +3865,11 @@ class AsyncTable:
|
||||
name=name,
|
||||
train=train,
|
||||
)
|
||||
except (ValueError, RuntimeError) as e:
|
||||
if isinstance(config, FTS):
|
||||
_maybe_add_fts_error_note(
|
||||
e,
|
||||
base_tokenizer=config.base_tokenizer,
|
||||
language=config.language,
|
||||
)
|
||||
except ValueError as e:
|
||||
if "not support the requested language" in str(e):
|
||||
supported_langs = ", ".join(lang_mapping.values())
|
||||
help_msg = f"Supported languages: {supported_langs}"
|
||||
add_note(e, help_msg)
|
||||
raise e
|
||||
|
||||
async def drop_index(self, name: str) -> None:
|
||||
@@ -5119,7 +5014,6 @@ class IndexStatistics:
|
||||
"IVF_RQ",
|
||||
"IVF_HNSW_SQ",
|
||||
"IVF_HNSW_PQ",
|
||||
"IVF_HNSW_FLAT",
|
||||
"FTS",
|
||||
"BTREE",
|
||||
"BITMAP",
|
||||
|
||||
@@ -24,7 +24,6 @@ VectorIndexType = Literal[
|
||||
"IVF_PQ",
|
||||
"IVF_HNSW_SQ",
|
||||
"IVF_HNSW_PQ",
|
||||
"IVF_HNSW_FLAT",
|
||||
"IVF_RQ",
|
||||
]
|
||||
ScalarIndexType = Literal["BTREE", "BITMAP", "LABEL_LIST"]
|
||||
@@ -32,7 +31,6 @@ IndexType = Literal[
|
||||
"IVF_PQ",
|
||||
"IVF_HNSW_PQ",
|
||||
"IVF_HNSW_SQ",
|
||||
"IVF_HNSW_FLAT",
|
||||
"IVF_SQ",
|
||||
"FTS",
|
||||
"BTREE",
|
||||
@@ -42,5 +40,4 @@ IndexType = Literal[
|
||||
]
|
||||
|
||||
# Tokenizer literals
|
||||
BuiltinTokenizerType = Literal["simple", "raw", "whitespace", "ngram"]
|
||||
BaseTokenizerType = BuiltinTokenizerType | str
|
||||
BaseTokenizerType = Literal["simple", "raw", "whitespace", "ngram"]
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
我们 98740 r
|
||||
都 202780 d
|
||||
有 423765 v
|
||||
光明 1219 n
|
||||
的 318825 uj
|
||||
前途 1263 n
|
||||
前 62779 f
|
||||
途 857 n
|
||||
@@ -1,4 +0,0 @@
|
||||
segmenter:
|
||||
mode: "normal"
|
||||
dictionary:
|
||||
path: "./python/tests/models/lindera/ipadic/main"
|
||||
Binary file not shown.
@@ -15,10 +15,7 @@
|
||||
# limitations under the License.
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
from unittest import mock
|
||||
from pathlib import Path
|
||||
import zipfile
|
||||
|
||||
import lancedb as ldb
|
||||
from lancedb.db import DBConnection
|
||||
@@ -39,8 +36,6 @@ import pytest
|
||||
import pytest_asyncio
|
||||
from utils import exception_output
|
||||
|
||||
TEST_LANGUAGE_MODEL_HOME = Path(__file__).parent / "models"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def table(tmp_path) -> ldb.table.LanceTable:
|
||||
@@ -94,40 +89,6 @@ def table(tmp_path) -> ldb.table.LanceTable:
|
||||
return table
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def language_model_home(monkeypatch, tmp_path):
|
||||
model_home = tmp_path / "language-models"
|
||||
shutil.copytree(TEST_LANGUAGE_MODEL_HOME, model_home)
|
||||
monkeypatch.setenv("LANCE_LANGUAGE_MODEL_HOME", str(model_home))
|
||||
return model_home
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def lindera_ipadic(language_model_home):
|
||||
model_path = language_model_home / "lindera" / "ipadic"
|
||||
extracted_model = model_path / "main"
|
||||
config_path = model_path / "config.yml"
|
||||
|
||||
if extracted_model.exists():
|
||||
shutil.rmtree(extracted_model)
|
||||
|
||||
with zipfile.ZipFile(model_path / "main.zip", "r") as zip_ref:
|
||||
zip_ref.extractall(model_path)
|
||||
config_path.write_text(
|
||||
"segmenter:\n"
|
||||
' mode: "normal"\n'
|
||||
" dictionary:\n"
|
||||
f' path: "{extracted_model.resolve().as_posix()}"\n',
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
if extracted_model.exists():
|
||||
shutil.rmtree(extracted_model)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def async_table(tmp_path) -> ldb.table.AsyncTable:
|
||||
# Use local random state to avoid affecting other tests
|
||||
@@ -723,90 +684,6 @@ def test_fts_ngram(mem_db: DBConnection):
|
||||
assert set(r["text"] for r in results) == {"lance database", "lance is cool"}
|
||||
|
||||
|
||||
def test_fts_jieba_tokenizer(mem_db: DBConnection, language_model_home):
|
||||
data = pa.table({"text": ["我们都有光明的前途", "光明的前途"]})
|
||||
table = mem_db.create_table("test_jieba", data=data)
|
||||
table.create_fts_index(
|
||||
"text",
|
||||
base_tokenizer="jieba/default",
|
||||
stem=False,
|
||||
remove_stop_words=False,
|
||||
ascii_folding=False,
|
||||
)
|
||||
|
||||
results = table.search("我们", query_type="fts").limit(10).to_list()
|
||||
assert [row["text"] for row in results] == ["我们都有光明的前途"]
|
||||
|
||||
|
||||
def test_fts_jieba_missing_language_model_note(
|
||||
mem_db: DBConnection, monkeypatch, tmp_path
|
||||
):
|
||||
missing_root = tmp_path / "missing-language-models"
|
||||
monkeypatch.setenv("LANCE_LANGUAGE_MODEL_HOME", str(missing_root))
|
||||
table = mem_db.create_table(
|
||||
"test_missing_jieba_model",
|
||||
data=pa.table({"text": ["我们都有光明的前途"]}),
|
||||
)
|
||||
|
||||
with pytest.raises((ValueError, RuntimeError)) as e:
|
||||
table.create_fts_index(
|
||||
"text",
|
||||
base_tokenizer="jieba/default",
|
||||
stem=False,
|
||||
remove_stop_words=False,
|
||||
ascii_folding=False,
|
||||
)
|
||||
|
||||
output = exception_output(e)
|
||||
assert "Invalid directory path:" in output
|
||||
assert "LANCE_LANGUAGE_MODEL_HOME" in output
|
||||
assert "jieba/default" in output
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fts_jieba_missing_language_model_note_async(monkeypatch, tmp_path):
|
||||
missing_root = tmp_path / "missing-language-models"
|
||||
monkeypatch.setenv("LANCE_LANGUAGE_MODEL_HOME", str(missing_root))
|
||||
db = await ldb.connect_async(tmp_path / "async-db")
|
||||
table = await db.create_table(
|
||||
"test_missing_jieba_model_async",
|
||||
data=pa.table({"text": ["我们都有光明的前途"]}),
|
||||
)
|
||||
|
||||
with pytest.raises((ValueError, RuntimeError)) as e:
|
||||
await table.create_index(
|
||||
"text",
|
||||
config=FTS(
|
||||
base_tokenizer="jieba/default",
|
||||
stem=False,
|
||||
remove_stop_words=False,
|
||||
ascii_folding=False,
|
||||
),
|
||||
)
|
||||
|
||||
output = exception_output(e)
|
||||
assert "Invalid directory path:" in output
|
||||
assert "LANCE_LANGUAGE_MODEL_HOME" in output
|
||||
assert "jieba/default" in output
|
||||
|
||||
|
||||
def test_fts_lindera_tokenizer(
|
||||
mem_db: DBConnection, language_model_home, lindera_ipadic
|
||||
):
|
||||
data = pa.table({"text": ["成田国際空港", "東京国際空港", "羽田空港"]})
|
||||
table = mem_db.create_table("test_lindera", data=data)
|
||||
table.create_fts_index(
|
||||
"text",
|
||||
base_tokenizer="lindera/ipadic",
|
||||
stem=False,
|
||||
remove_stop_words=False,
|
||||
ascii_folding=False,
|
||||
)
|
||||
|
||||
results = table.search("成田", query_type="fts").limit(10).to_list()
|
||||
assert [row["text"] for row in results] == ["成田国際空港"]
|
||||
|
||||
|
||||
def test_fts_query_to_json():
|
||||
"""Test that FTS query to_json() produces valid JSON strings with exact format."""
|
||||
|
||||
|
||||
@@ -16,13 +16,11 @@ from lancedb.index import (
|
||||
IvfSq,
|
||||
IvfHnswPq,
|
||||
IvfHnswSq,
|
||||
IvfHnswFlat,
|
||||
IvfRq,
|
||||
Bitmap,
|
||||
LabelList,
|
||||
HnswPq,
|
||||
HnswSq,
|
||||
HnswFlat,
|
||||
FTS,
|
||||
)
|
||||
from lancedb.table import IndexStatistics
|
||||
@@ -252,21 +250,6 @@ async def test_create_hnswpq_alias_index(some_table: AsyncTable):
|
||||
assert indices[0].index_type in {"HnswPq", "IvfHnswPq"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_hnswflat_index(some_table: AsyncTable):
|
||||
await some_table.create_index("vector", config=HnswFlat(num_partitions=10))
|
||||
indices = await some_table.list_indices()
|
||||
assert len(indices) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_hnswflat_alias_index(some_table: AsyncTable):
|
||||
await some_table.create_index("vector", config=IvfHnswFlat(num_partitions=5))
|
||||
indices = await some_table.list_indices()
|
||||
assert len(indices) == 1
|
||||
assert indices[0].index_type in {"HnswFlat", "IvfHnswFlat"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_ivfsq_index(some_table: AsyncTable):
|
||||
await some_table.create_index("vector", config=IvfSq(num_partitions=10))
|
||||
@@ -312,7 +295,6 @@ def test_index_statistics_index_type_lists_all_supported_values():
|
||||
"IVF_RQ",
|
||||
"IVF_HNSW_SQ",
|
||||
"IVF_HNSW_PQ",
|
||||
"IVF_HNSW_FLAT",
|
||||
"FTS",
|
||||
"BTREE",
|
||||
"BITMAP",
|
||||
|
||||
@@ -9,6 +9,21 @@ from lancedb import DBConnection, Table, connect
|
||||
from lancedb.permutation import Permutation, Permutations, permutation_builder
|
||||
|
||||
|
||||
def test_permutation_persistence(tmp_path):
|
||||
db = connect(tmp_path)
|
||||
tbl = db.create_table("test_table", pa.table({"x": range(100), "y": range(100)}))
|
||||
|
||||
permutation_tbl = (
|
||||
permutation_builder(tbl).shuffle().persist(db, "test_permutation").execute()
|
||||
)
|
||||
assert permutation_tbl.count_rows() == 100
|
||||
|
||||
re_open = db.open_table("test_permutation")
|
||||
assert re_open.count_rows() == 100
|
||||
|
||||
assert permutation_tbl.to_arrow() == re_open.to_arrow()
|
||||
|
||||
|
||||
def test_split_random_ratios(mem_db):
|
||||
"""Test random splitting with ratios."""
|
||||
tbl = mem_db.create_table(
|
||||
|
||||
@@ -6,8 +6,6 @@ import contextlib
|
||||
from datetime import timedelta
|
||||
import http.server
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from unittest.mock import MagicMock, patch
|
||||
@@ -1232,82 +1230,3 @@ def test_background_loop_cancellation(exception):
|
||||
with pytest.raises(exception):
|
||||
loop.run(None)
|
||||
mock_future.cancel.assert_called_once()
|
||||
|
||||
|
||||
def _remote_fork_child(port: int, queue) -> None:
|
||||
# Build a fresh Connection in the child so we exercise the at-fork-child
|
||||
# tokio runtime reset rather than relying on an inherited reqwest client.
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
},
|
||||
)
|
||||
queue.put(db.table_names())
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
"fork() is unavailable on Windows and unsafe on macOS "
|
||||
"(Apple frameworks/TLS are not fork-safe)"
|
||||
),
|
||||
)
|
||||
def test_remote_connection_after_fork():
|
||||
"""A freshly-built remote Connection in a forked child should not hang.
|
||||
|
||||
The pyo3-async-runtimes tokio runtime would otherwise be inherited from
|
||||
the parent with dead worker threads; the at-fork-child handler in our
|
||||
runtime module rebuilds it on first use in the child.
|
||||
"""
|
||||
|
||||
def handler(request):
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b'{"tables": []}')
|
||||
|
||||
server = http.server.HTTPServer(("localhost", 0), make_mock_http_handler(handler))
|
||||
port = server.server_address[1]
|
||||
server_thread = threading.Thread(target=server.serve_forever)
|
||||
server_thread.start()
|
||||
try:
|
||||
# Hit the server in the parent first so the runtime + LOOP are warm
|
||||
# before fork; a fresh child must still succeed.
|
||||
parent_db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
},
|
||||
)
|
||||
assert parent_db.table_names() == []
|
||||
|
||||
ctx = mp.get_context("fork")
|
||||
queue = ctx.Queue()
|
||||
proc = ctx.Process(target=_remote_fork_child, args=(port, queue))
|
||||
proc.start()
|
||||
proc.join(timeout=15)
|
||||
|
||||
if proc.is_alive():
|
||||
proc.terminate()
|
||||
proc.join(timeout=5)
|
||||
if proc.is_alive():
|
||||
proc.kill()
|
||||
proc.join()
|
||||
pytest.fail("Remote connection hung after fork")
|
||||
|
||||
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
|
||||
assert not queue.empty(), "child produced no result"
|
||||
assert queue.get() == []
|
||||
|
||||
# Parent connection must still be usable after the child returned.
|
||||
assert parent_db.table_names() == []
|
||||
finally:
|
||||
server.shutdown()
|
||||
server_thread.join()
|
||||
|
||||
@@ -11,7 +11,7 @@ from unittest.mock import patch
|
||||
|
||||
import lancedb
|
||||
from lancedb.dependencies import _PANDAS_AVAILABLE
|
||||
from lancedb.index import HnswFlat, HnswPq, HnswSq, IvfPq
|
||||
from lancedb.index import HnswPq, HnswSq, IvfPq
|
||||
import numpy as np
|
||||
import polars as pl
|
||||
import pyarrow as pa
|
||||
@@ -917,21 +917,6 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
|
||||
"my_vector", replace=True, config=expected_config, name=None, train=True
|
||||
)
|
||||
|
||||
table.create_index(
|
||||
vector_column_name="my_vector",
|
||||
metric="cosine",
|
||||
index_type="IVF_HNSW_FLAT",
|
||||
sample_rate=0.1,
|
||||
m=29,
|
||||
ef_construction=10,
|
||||
)
|
||||
expected_config = HnswFlat(
|
||||
distance_type="cosine", sample_rate=0.1, m=29, ef_construction=10
|
||||
)
|
||||
mock_create_index.assert_called_with(
|
||||
"my_vector", replace=True, config=expected_config, name=None, train=True
|
||||
)
|
||||
|
||||
|
||||
@patch("lancedb.table.AsyncTable.create_index")
|
||||
def test_create_index_name_and_train_parameters(
|
||||
|
||||
@@ -1,29 +1,14 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
import functools
|
||||
import multiprocessing as mp
|
||||
import pickle
|
||||
import sys
|
||||
|
||||
import lancedb
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
from lancedb.permutation import Permutation, Permutations, permutation_builder
|
||||
from lancedb.util import tbl_to_tensor
|
||||
from lancedb.permutation import Permutation
|
||||
|
||||
torch = pytest.importorskip("torch")
|
||||
|
||||
|
||||
def _open_native_table(uri: str, table_name: str):
|
||||
"""Top-level connection factory used by the explicit-factory pickle test.
|
||||
|
||||
Defined at module scope so that pickle can resolve it by name in the
|
||||
worker / unpickling process.
|
||||
"""
|
||||
return lancedb.connect(uri).open_table(table_name)
|
||||
|
||||
|
||||
def test_table_dataloader(mem_db):
|
||||
table = mem_db.create_table("test_table", pa.table({"a": range(1000)}))
|
||||
dataloader = torch.utils.data.DataLoader(
|
||||
@@ -55,156 +40,3 @@ def test_permutation_dataloader(mem_db):
|
||||
for batch in dataloader:
|
||||
assert batch.size(0) == 1
|
||||
assert batch.size(1) == 10
|
||||
|
||||
|
||||
def test_permutation_is_picklable(tmp_db):
|
||||
"""A Permutation must be picklable so it can be used with PyTorch's
|
||||
DataLoader when num_workers > 0 (which uses multiprocessing and pickles
|
||||
the dataset to pass it to worker processes)."""
|
||||
table = tmp_db.create_table("test_table", pa.table({"a": range(1000)}))
|
||||
permutation = Permutation.identity(table)
|
||||
|
||||
pickled = pickle.dumps(permutation)
|
||||
restored = pickle.loads(pickled)
|
||||
|
||||
assert len(restored) == 1000
|
||||
rows = restored.__getitems__([0, 1, 2])
|
||||
assert rows == [{"a": 0}, {"a": 1}, {"a": 2}]
|
||||
|
||||
|
||||
def test_permutation_with_memory_base_is_picklable(mem_db):
|
||||
"""An in-memory base table is inlined into the pickle as Arrow IPC bytes
|
||||
and rebuilt on the other side as an in-memory LanceTable, so the
|
||||
Permutation round-trips even though the original database can't be
|
||||
reopened across processes."""
|
||||
table = mem_db.create_table("test_table", pa.table({"a": range(50)}))
|
||||
permutation = Permutation.identity(table)
|
||||
|
||||
restored = pickle.loads(pickle.dumps(permutation))
|
||||
|
||||
assert len(restored) == 50
|
||||
assert restored.__getitems__([0, 10, 49]) == [{"a": 0}, {"a": 10}, {"a": 49}]
|
||||
|
||||
|
||||
def test_permutation_dataloader_multiprocessing(tmp_db):
|
||||
"""Using a Permutation with a PyTorch DataLoader that has num_workers > 0
|
||||
must work end-to-end. Each worker process gets a pickled copy of the
|
||||
dataset and reads batches from it."""
|
||||
table = tmp_db.create_table("test_table", pa.table({"a": range(1000)}))
|
||||
permutation = Permutation.identity(table)
|
||||
|
||||
dataloader = torch.utils.data.DataLoader(
|
||||
permutation,
|
||||
batch_size=10,
|
||||
shuffle=True,
|
||||
num_workers=2,
|
||||
multiprocessing_context="spawn",
|
||||
)
|
||||
seen = 0
|
||||
for batch in dataloader:
|
||||
assert batch["a"].size(0) == 10
|
||||
seen += batch["a"].size(0)
|
||||
assert seen == 1000
|
||||
|
||||
|
||||
def test_permutation_pickle_with_connection_factory(tmp_path):
|
||||
"""When the user provides a connection_factory, pickling should round-trip
|
||||
through that factory rather than introspecting the connection URI. Useful
|
||||
for remote / cloud connections where the URI alone isn't reopenable."""
|
||||
db = lancedb.connect(tmp_path)
|
||||
db.create_table("test_table", pa.table({"a": range(50)}))
|
||||
|
||||
factory = functools.partial(_open_native_table, str(tmp_path))
|
||||
permutation = Permutation.identity(factory("test_table")).with_connection_factory(
|
||||
factory
|
||||
)
|
||||
|
||||
restored = pickle.loads(pickle.dumps(permutation))
|
||||
|
||||
assert len(restored) == 50
|
||||
# The factory survives pickling and is what powered base-table reopen.
|
||||
assert restored.connection_factory is not None
|
||||
assert restored.connection_factory.func is _open_native_table
|
||||
assert restored.__getitems__([0, 1, 2]) == [{"a": 0}, {"a": 1}, {"a": 2}]
|
||||
|
||||
|
||||
def test_permutation_with_builder_is_picklable(tmp_db):
|
||||
"""A Permutation built from a non-identity permutation table must round-trip
|
||||
through pickle while preserving the row order defined by the permutation."""
|
||||
table = tmp_db.create_table("test_table", pa.table({"a": range(100)}))
|
||||
perm_tbl = (
|
||||
permutation_builder(table)
|
||||
.split_random(ratios=[0.8, 0.2], seed=42, split_names=["train", "test"])
|
||||
.shuffle(seed=42)
|
||||
.execute()
|
||||
)
|
||||
permutations = Permutations(table, perm_tbl)
|
||||
permutation = permutations["train"]
|
||||
|
||||
indices = list(range(len(permutation)))
|
||||
expected = permutation.__getitems__(indices)
|
||||
|
||||
restored = pickle.loads(pickle.dumps(permutation))
|
||||
|
||||
assert len(restored) == len(permutation)
|
||||
assert restored.__getitems__(indices) == expected
|
||||
|
||||
|
||||
def _multiworker_dataloader_target(db_uri: str, result_queue):
|
||||
import lancedb
|
||||
from lancedb.permutation import Permutation
|
||||
|
||||
db = lancedb.connect(db_uri)
|
||||
table = db.open_table("test_table")
|
||||
permutation = Permutation.identity(table)
|
||||
|
||||
dataloader = torch.utils.data.DataLoader(
|
||||
permutation,
|
||||
batch_size=10,
|
||||
num_workers=2,
|
||||
multiprocessing_context="fork",
|
||||
)
|
||||
count = 0
|
||||
for batch in dataloader:
|
||||
assert batch["a"].size(0) == 10
|
||||
count += 1
|
||||
result_queue.put(count)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
"fork() is unavailable on Windows and unsafe on macOS "
|
||||
"(Apple frameworks/TLS are not fork-safe)"
|
||||
),
|
||||
)
|
||||
def test_permutation_dataloader_fork_workers(tmp_path):
|
||||
"""A Permutation used by a fork-based DataLoader should not hang.
|
||||
|
||||
PyTorch's DataLoader uses fork-based multiprocessing by default on Linux.
|
||||
LanceDB drives async work through a background asyncio thread that does
|
||||
not survive a fork, so any LOOP.run() in a worker blocks forever.
|
||||
"""
|
||||
import lancedb
|
||||
|
||||
db_uri = str(tmp_path / "db")
|
||||
db = lancedb.connect(db_uri)
|
||||
db.create_table("test_table", pa.table({"a": list(range(1000))}))
|
||||
|
||||
ctx = mp.get_context("spawn")
|
||||
queue = ctx.Queue()
|
||||
proc = ctx.Process(target=_multiworker_dataloader_target, args=(db_uri, queue))
|
||||
proc.start()
|
||||
proc.join(timeout=30)
|
||||
|
||||
if proc.is_alive():
|
||||
proc.terminate()
|
||||
proc.join(timeout=5)
|
||||
if proc.is_alive():
|
||||
proc.kill()
|
||||
proc.join()
|
||||
pytest.fail("Permutation hung when iterated in a fork-based DataLoader worker")
|
||||
|
||||
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
|
||||
assert not queue.empty(), "child produced no batches"
|
||||
assert queue.get() == 100
|
||||
|
||||
@@ -3,8 +3,6 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::error::PythonErrorExt;
|
||||
use crate::runtime::future_into_py;
|
||||
use arrow::{
|
||||
datatypes::SchemaRef,
|
||||
pyarrow::{IntoPyArrow, ToPyArrow},
|
||||
@@ -14,6 +12,9 @@ use lancedb::arrow::SendableRecordBatchStream;
|
||||
use pyo3::{
|
||||
Bound, Py, PyAny, PyRef, PyResult, Python, exceptions::PyStopAsyncIteration, pyclass, pymethods,
|
||||
};
|
||||
use pyo3_async_runtimes::tokio::future_into_py;
|
||||
|
||||
use crate::error::PythonErrorExt;
|
||||
|
||||
#[pyclass]
|
||||
pub struct RecordBatchStream {
|
||||
|
||||
@@ -7,12 +7,6 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
error::PythonErrorExt,
|
||||
namespace::{create_namespace_storage_options_provider, extract_namespace_arc},
|
||||
runtime::future_into_py,
|
||||
table::Table,
|
||||
};
|
||||
use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::FromPyArrow};
|
||||
use lancedb::{
|
||||
connection::Connection as LanceConnection,
|
||||
@@ -26,6 +20,13 @@ use pyo3::{
|
||||
pyclass, pyfunction, pymethods,
|
||||
types::{PyDict, PyDictMethods},
|
||||
};
|
||||
use pyo3_async_runtimes::tokio::future_into_py;
|
||||
|
||||
use crate::{
|
||||
error::PythonErrorExt,
|
||||
namespace::{create_namespace_storage_options_provider, extract_namespace_arc},
|
||||
table::Table,
|
||||
};
|
||||
|
||||
#[pyclass]
|
||||
pub struct Connection {
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use lancedb::index::vector::{
|
||||
IvfFlatIndexBuilder, IvfHnswFlatIndexBuilder, IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder,
|
||||
IvfPqIndexBuilder, IvfRqIndexBuilder, IvfSqIndexBuilder,
|
||||
};
|
||||
use lancedb::index::vector::{IvfFlatIndexBuilder, IvfRqIndexBuilder, IvfSqIndexBuilder};
|
||||
use lancedb::index::{
|
||||
Index as LanceDbIndex,
|
||||
scalar::{BTreeIndexBuilder, FtsIndexBuilder},
|
||||
vector::{IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder, IvfPqIndexBuilder},
|
||||
};
|
||||
use pyo3::IntoPyObject;
|
||||
use pyo3::types::PyStringMethods;
|
||||
@@ -164,26 +162,8 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
|
||||
}
|
||||
Ok(LanceDbIndex::IvfHnswSq(hnsw_sq_builder))
|
||||
}
|
||||
"HnswFlat" => {
|
||||
let params = source.extract::<IvfHnswFlatParams>()?;
|
||||
let distance_type = parse_distance_type(params.distance_type)?;
|
||||
let mut hnsw_flat_builder = IvfHnswFlatIndexBuilder::default()
|
||||
.distance_type(distance_type)
|
||||
.max_iterations(params.max_iterations)
|
||||
.sample_rate(params.sample_rate)
|
||||
.num_edges(params.m)
|
||||
.ef_construction(params.ef_construction);
|
||||
if let Some(num_partitions) = params.num_partitions {
|
||||
hnsw_flat_builder = hnsw_flat_builder.num_partitions(num_partitions);
|
||||
}
|
||||
if let Some(target_partition_size) = params.target_partition_size {
|
||||
hnsw_flat_builder =
|
||||
hnsw_flat_builder.target_partition_size(target_partition_size);
|
||||
}
|
||||
Ok(LanceDbIndex::IvfHnswFlat(hnsw_flat_builder))
|
||||
}
|
||||
not_supported => Err(PyValueError::new_err(format!(
|
||||
"Invalid index type '{}'. Must be one of BTree, Bitmap, LabelList, FTS, IvfPq, IvfSq, IvfHnswPq, IvfHnswSq, or IvfHnswFlat",
|
||||
"Invalid index type '{}'. Must be one of BTree, Bitmap, LabelList, FTS, IvfPq, IvfSq, IvfHnswPq, or IvfHnswSq",
|
||||
not_supported
|
||||
))),
|
||||
}
|
||||
@@ -270,17 +250,6 @@ struct IvfHnswSqParams {
|
||||
target_partition_size: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(FromPyObject)]
|
||||
struct IvfHnswFlatParams {
|
||||
distance_type: String,
|
||||
num_partitions: Option<u32>,
|
||||
max_iterations: u32,
|
||||
sample_rate: u32,
|
||||
m: u32,
|
||||
ef_construction: u32,
|
||||
target_partition_size: Option<u32>,
|
||||
}
|
||||
|
||||
#[pyclass(get_all)]
|
||||
/// A description of an index currently configured on a column
|
||||
pub struct IndexConfig {
|
||||
|
||||
@@ -28,7 +28,6 @@ pub mod index;
|
||||
pub mod namespace;
|
||||
pub mod permutation;
|
||||
pub mod query;
|
||||
pub mod runtime;
|
||||
pub mod session;
|
||||
pub mod table;
|
||||
pub mod util;
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::{
|
||||
arrow::RecordBatchStream, error::PythonErrorExt, runtime::future_into_py, table::Table,
|
||||
arrow::RecordBatchStream, connection::Connection, error::PythonErrorExt, table::Table,
|
||||
};
|
||||
use arrow::pyarrow::{PyArrowType, ToPyArrow};
|
||||
use lancedb::{
|
||||
@@ -21,6 +21,7 @@ use pyo3::{
|
||||
pyclass, pymethods,
|
||||
types::{PyAnyMethods, PyDict, PyDictMethods, PyType},
|
||||
};
|
||||
use pyo3_async_runtimes::tokio::future_into_py;
|
||||
|
||||
fn table_from_py<'a>(table: Bound<'a, PyAny>) -> PyResult<Bound<'a, Table>> {
|
||||
if table.hasattr("_inner")? {
|
||||
@@ -79,6 +80,24 @@ impl PyAsyncPermutationBuilder {
|
||||
|
||||
#[pymethods]
|
||||
impl PyAsyncPermutationBuilder {
|
||||
#[pyo3(signature = (database, table_name))]
|
||||
pub fn persist(
|
||||
slf: PyRefMut<'_, Self>,
|
||||
database: Bound<'_, PyAny>,
|
||||
table_name: String,
|
||||
) -> PyResult<Self> {
|
||||
let conn = if database.hasattr("_conn")? {
|
||||
database
|
||||
.getattr("_conn")?
|
||||
.getattr("_inner")?
|
||||
.cast_into::<Connection>()?
|
||||
} else {
|
||||
database.getattr("_inner")?.cast_into::<Connection>()?
|
||||
};
|
||||
let database = conn.borrow().database()?;
|
||||
slf.modify(|builder| builder.persist(database, table_name))
|
||||
}
|
||||
|
||||
#[pyo3(signature = (*, ratios=None, counts=None, fixed=None, seed=None, split_names=None))]
|
||||
pub fn split_random(
|
||||
slf: PyRefMut<'_, Self>,
|
||||
|
||||
@@ -4,11 +4,6 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::expr::PyExpr;
|
||||
use crate::runtime::future_into_py;
|
||||
use crate::util::parse_distance_type;
|
||||
use crate::{arrow::RecordBatchStream, util::PyLanceDB};
|
||||
use crate::{error::PythonErrorExt, index::class_name};
|
||||
use arrow::array::Array;
|
||||
use arrow::array::ArrayData;
|
||||
use arrow::array::make_array;
|
||||
@@ -41,6 +36,12 @@ use pyo3::types::{PyDict, PyString};
|
||||
use pyo3::{Borrowed, FromPyObject, exceptions::PyRuntimeError};
|
||||
use pyo3::{PyErr, pyclass};
|
||||
use pyo3::{exceptions::PyValueError, intern};
|
||||
use pyo3_async_runtimes::tokio::future_into_py;
|
||||
|
||||
use crate::expr::PyExpr;
|
||||
use crate::util::parse_distance_type;
|
||||
use crate::{arrow::RecordBatchStream, util::PyLanceDB};
|
||||
use crate::{error::PythonErrorExt, index::class_name};
|
||||
|
||||
impl<'a, 'py> FromPyObject<'a, 'py> for PyLanceDB<FtsQuery> {
|
||||
type Error = PyErr;
|
||||
|
||||
@@ -1,142 +0,0 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! Fork-safe wrapper around tokio + pyo3-async-runtimes.
|
||||
//!
|
||||
//! `pyo3_async_runtimes::tokio` keeps its multi-threaded runtime in a
|
||||
//! `OnceLock` that can never be replaced. Tokio's worker threads do not
|
||||
//! survive `fork()`, so once a child inherits a "frozen" runtime, every
|
||||
//! `future_into_py` call hangs forever.
|
||||
//!
|
||||
//! We sidestep the global by routing every future through our own
|
||||
//! [`LanceRuntime`] (a [`pyo3_async_runtimes::generic::Runtime`] impl) backed
|
||||
//! by an [`AtomicPtr`] to a tokio runtime that we own. A `pthread_atfork`
|
||||
//! child handler nulls the pointer; the next `spawn` rebuilds the runtime in
|
||||
//! the child. This mirrors the pattern used in the Lance Python bindings.
|
||||
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
|
||||
|
||||
use pyo3::{Bound, PyAny, PyResult, Python, conversion::IntoPyObject};
|
||||
use pyo3_async_runtimes::{
|
||||
TaskLocals,
|
||||
generic::{ContextExt, JoinError, Runtime},
|
||||
};
|
||||
use tokio::{runtime, task};
|
||||
|
||||
static RUNTIME: AtomicPtr<runtime::Runtime> = AtomicPtr::new(std::ptr::null_mut());
|
||||
static RUNTIME_INSTALLING: AtomicBool = AtomicBool::new(false);
|
||||
static ATFORK_INSTALLED: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
fn create_runtime() -> runtime::Runtime {
|
||||
runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.thread_name("lancedb-tokio-worker")
|
||||
.build()
|
||||
.expect("Failed to build tokio runtime")
|
||||
}
|
||||
|
||||
fn get_runtime() -> &'static runtime::Runtime {
|
||||
loop {
|
||||
let ptr = RUNTIME.load(Ordering::SeqCst);
|
||||
if !ptr.is_null() {
|
||||
return unsafe { &*ptr };
|
||||
}
|
||||
if !RUNTIME_INSTALLING.fetch_or(true, Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
std::thread::yield_now();
|
||||
}
|
||||
if !ATFORK_INSTALLED.fetch_or(true, Ordering::SeqCst) {
|
||||
install_atfork();
|
||||
}
|
||||
let new_ptr = Box::into_raw(Box::new(create_runtime()));
|
||||
RUNTIME.store(new_ptr, Ordering::SeqCst);
|
||||
unsafe { &*new_ptr }
|
||||
}
|
||||
|
||||
/// Runs in async-signal context after `fork()` in the child. We can only
|
||||
/// touch atomics here; we deliberately leak the previous runtime because
|
||||
/// dropping a tokio `Runtime` would try to join its (now-dead) worker
|
||||
/// threads and hang.
|
||||
extern "C" fn atfork_child() {
|
||||
RUNTIME.store(std::ptr::null_mut(), Ordering::SeqCst);
|
||||
RUNTIME_INSTALLING.store(false, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
fn install_atfork() {
|
||||
unsafe { libc::pthread_atfork(None, None, Some(atfork_child)) };
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn install_atfork() {}
|
||||
|
||||
/// Marker type implementing [`Runtime`] over our fork-safe runtime slot.
|
||||
pub struct LanceRuntime;
|
||||
|
||||
/// Newtype wrapper around `tokio::task::JoinError` so we can implement the
|
||||
/// foreign [`JoinError`] trait without violating orphan rules.
|
||||
pub struct LanceJoinError(task::JoinError);
|
||||
|
||||
impl JoinError for LanceJoinError {
|
||||
fn is_panic(&self) -> bool {
|
||||
self.0.is_panic()
|
||||
}
|
||||
fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static> {
|
||||
self.0.into_panic()
|
||||
}
|
||||
}
|
||||
|
||||
impl Runtime for LanceRuntime {
|
||||
type JoinError = LanceJoinError;
|
||||
type JoinHandle = Pin<Box<dyn Future<Output = Result<(), Self::JoinError>> + Send>>;
|
||||
|
||||
fn spawn<F>(fut: F) -> Self::JoinHandle
|
||||
where
|
||||
F: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
let handle = get_runtime().spawn(fut);
|
||||
Box::pin(async move { handle.await.map_err(LanceJoinError) })
|
||||
}
|
||||
|
||||
fn spawn_blocking<F>(f: F) -> Self::JoinHandle
|
||||
where
|
||||
F: FnOnce() + Send + 'static,
|
||||
{
|
||||
let handle = get_runtime().spawn_blocking(f);
|
||||
Box::pin(async move { handle.await.map_err(LanceJoinError) })
|
||||
}
|
||||
}
|
||||
|
||||
tokio::task_local! {
|
||||
static TASK_LOCALS: std::cell::OnceCell<TaskLocals>;
|
||||
}
|
||||
|
||||
impl ContextExt for LanceRuntime {
|
||||
fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
|
||||
where
|
||||
F: Future<Output = R> + Send + 'static,
|
||||
{
|
||||
let cell = std::cell::OnceCell::new();
|
||||
cell.set(locals).unwrap();
|
||||
Box::pin(TASK_LOCALS.scope(cell, fut))
|
||||
}
|
||||
|
||||
fn get_task_locals() -> Option<TaskLocals> {
|
||||
TASK_LOCALS
|
||||
.try_with(|c| c.get().cloned())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop-in replacement for `pyo3_async_runtimes::tokio::future_into_py` that
|
||||
/// uses our fork-safe runtime.
|
||||
pub fn future_into_py<F, T>(py: Python<'_>, fut: F) -> PyResult<Bound<'_, PyAny>>
|
||||
where
|
||||
F: Future<Output = PyResult<T>> + Send + 'static,
|
||||
T: for<'py> IntoPyObject<'py> + Send + 'static,
|
||||
{
|
||||
pyo3_async_runtimes::generic::future_into_py::<LanceRuntime, _, T>(py, fut)
|
||||
}
|
||||
@@ -2,7 +2,6 @@
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use crate::runtime::future_into_py;
|
||||
use crate::{
|
||||
connection::Connection,
|
||||
error::PythonErrorExt,
|
||||
@@ -25,6 +24,7 @@ use pyo3::{
|
||||
pyclass, pymethods,
|
||||
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
|
||||
};
|
||||
use pyo3_async_runtimes::tokio::future_into_py;
|
||||
|
||||
mod scannable;
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ lance-datafusion.workspace = true
|
||||
lance-datagen = { workspace = true }
|
||||
lance-file = { workspace = true }
|
||||
lance-io = { workspace = true }
|
||||
lance-index = { workspace = true, features = ["tokenizer-jieba", "tokenizer-lindera"] }
|
||||
lance-index = { workspace = true }
|
||||
lance-table = { workspace = true }
|
||||
lance-linalg = { workspace = true }
|
||||
lance-testing = { workspace = true }
|
||||
@@ -108,12 +108,7 @@ test-log = "0.2"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
aws = [
|
||||
"lance/aws",
|
||||
"lance-io/aws",
|
||||
"lance-namespace-impls/dir-aws",
|
||||
"object_store/aws",
|
||||
]
|
||||
aws = ["lance/aws", "lance-io/aws", "lance-namespace-impls/dir-aws"]
|
||||
oss = ["lance/oss", "lance-io/oss", "lance-namespace-impls/dir-oss"]
|
||||
gcs = ["lance/gcp", "lance-io/gcp", "lance-namespace-impls/dir-gcp"]
|
||||
azure = [
|
||||
|
||||
@@ -505,15 +505,8 @@ impl ListingDatabase {
|
||||
// Filter out the commit store query param -- it's a lancedb param
|
||||
url.query_pairs_mut().clear();
|
||||
url.query_pairs_mut().extend_pairs(filtered_querys);
|
||||
// Take a copy of the query string so we can propagate it to lance.
|
||||
// `query_pairs_mut()` leaves the URL with `Some("")` even when no
|
||||
// pairs survive (or none existed in the first place), so an empty
|
||||
// string here must be treated the same as "no query" — otherwise
|
||||
// every table URI ends up with a trailing `?`, which makes downstream
|
||||
// sub-paths (e.g. MemWAL gen paths) re-parse as path=<base table> +
|
||||
// query=<sub-path>, causing Lance to find the base table dataset
|
||||
// when looking up the sub-path.
|
||||
let query_string = url.query().filter(|q| !q.is_empty()).map(|s| s.to_string());
|
||||
// Take a copy of the query string so we can propagate it to lance
|
||||
let query_string = url.query().map(|s| s.to_string());
|
||||
// clear the query string so we can use the url as the base uri
|
||||
// use .set_query(None) instead of .set_query("") because the latter
|
||||
// will add a trailing '?' to the url
|
||||
@@ -722,7 +715,7 @@ impl ListingDatabase {
|
||||
let commit_handler = commit_handler_from_url(&uri, &Some(object_store_params)).await?;
|
||||
for name in names {
|
||||
let dir_name = format!("{}.{}", name, LANCE_EXTENSION);
|
||||
let full_path = self.base_path.clone().join(dir_name.clone());
|
||||
let full_path = self.base_path.child(dir_name.clone());
|
||||
|
||||
commit_handler.delete(&full_path).await?;
|
||||
|
||||
@@ -2220,133 +2213,6 @@ mod tests {
|
||||
assert_eq!(uri, expected);
|
||||
}
|
||||
|
||||
/// Regression: connecting via a URL-style URI (which goes through
|
||||
/// `url::Url::parse` and the `query_pairs_mut()` path) must not
|
||||
/// append a trailing `?` to per-table URIs when the input URI has
|
||||
/// no query string.
|
||||
///
|
||||
/// Earlier, `query_pairs_mut().clear()` left the URL with
|
||||
/// `query=Some("")`, which then propagated as a trailing `?` on
|
||||
/// every table URI. Sub-path lookups against that URI (e.g. MemWAL
|
||||
/// `<table_uri>/_mem_wal/<shard>/<rand>_gen_<n>`) re-parsed as
|
||||
/// `path=<base table>` + `query=/_mem_wal/...`, causing
|
||||
/// `Dataset::write` to find the base table dataset and falsely
|
||||
/// report `Dataset already exists`.
|
||||
/// Mirrors the URL-mutation step from
|
||||
/// [`ListingDatabase::connect_with_options`] so we can assert the
|
||||
/// fix without going through filesystem setup (which is awkward
|
||||
/// across platforms — see the `file://` test below).
|
||||
fn capture_query_like_connect(input_uri: &str) -> Option<String> {
|
||||
let mut url = url::Url::parse(input_uri).unwrap();
|
||||
let mut filtered_querys = Vec::new();
|
||||
for (key, value) in url.query_pairs() {
|
||||
if key == ENGINE || key == MIRRORED_STORE {
|
||||
continue;
|
||||
}
|
||||
filtered_querys.push((key.to_string(), value.to_string()));
|
||||
}
|
||||
url.query_pairs_mut().clear();
|
||||
url.query_pairs_mut().extend_pairs(filtered_querys);
|
||||
url.query().filter(|q| !q.is_empty()).map(|s| s.to_string())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_capture_query_treats_empty_as_none() {
|
||||
// No query at all. With the bug, `query_pairs_mut()` left the
|
||||
// URL with `query=Some("")` and we used to propagate that.
|
||||
assert_eq!(
|
||||
capture_query_like_connect("s3://bucket/prefix/"),
|
||||
None,
|
||||
"empty query after mutation must be treated as no query"
|
||||
);
|
||||
|
||||
// Real query is propagated.
|
||||
assert_eq!(
|
||||
capture_query_like_connect("s3://bucket/prefix/?foo=bar"),
|
||||
Some("foo=bar".to_string())
|
||||
);
|
||||
|
||||
// lancedb-internal `engine=` is stripped; nothing remains, so
|
||||
// query_string is None — not Some("").
|
||||
assert_eq!(
|
||||
capture_query_like_connect(&format!("s3://bucket/prefix/?{}=mem", ENGINE)),
|
||||
None
|
||||
);
|
||||
|
||||
// Mixed: drop `engine=`, keep the rest.
|
||||
let captured =
|
||||
capture_query_like_connect(&format!("s3://bucket/prefix/?{}=mem&foo=bar", ENGINE));
|
||||
assert_eq!(captured.as_deref(), Some("foo=bar"));
|
||||
}
|
||||
|
||||
/// Regression: connecting via a URL-style URI (which goes through
|
||||
/// `url::Url::parse` and the `query_pairs_mut()` path) must not
|
||||
/// append a trailing `?` to per-table URIs when the input URI has
|
||||
/// no query string. Sub-path lookups against such a URI (e.g.
|
||||
/// MemWAL `<table_uri>/_mem_wal/<shard>/<rand>_gen_<n>`) re-parse
|
||||
/// as `path=<base table>` + `query=/_mem_wal/...`, causing
|
||||
/// `Dataset::write` to find the base table dataset and falsely
|
||||
/// report `Dataset already exists`.
|
||||
///
|
||||
/// Skipped on Windows: `try_create_dir` does not understand
|
||||
/// `file:///C:/…` paths so `connect_with_options` fails before
|
||||
/// even reaching the URL-mutation logic. The pure URL-mutation
|
||||
/// invariant is covered by
|
||||
/// `test_capture_query_treats_empty_as_none` above, which runs
|
||||
/// on all platforms.
|
||||
#[cfg(not(windows))]
|
||||
#[tokio::test]
|
||||
async fn test_table_uri_url_path_has_no_trailing_question_mark() {
|
||||
let tempdir = tempdir().unwrap();
|
||||
let uri = format!("file://{}", tempdir.path().to_str().unwrap());
|
||||
|
||||
let request = ConnectRequest {
|
||||
uri: uri.clone(),
|
||||
#[cfg(feature = "remote")]
|
||||
client_config: Default::default(),
|
||||
options: Default::default(),
|
||||
namespace_client_properties: Default::default(),
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
let db = ListingDatabase::connect_with_options(&request)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
db.query_string, None,
|
||||
"no input query → no captured query_string"
|
||||
);
|
||||
|
||||
let table_uri = db.table_uri("test").unwrap();
|
||||
assert!(
|
||||
!table_uri.ends_with('?'),
|
||||
"table_uri must not have a trailing `?`: {}",
|
||||
table_uri
|
||||
);
|
||||
assert_eq!(table_uri, format!("{}/test.lance", uri));
|
||||
|
||||
// A real query string should still be propagated.
|
||||
let with_query = format!("{}?foo=bar", uri);
|
||||
let request_with_query = ConnectRequest {
|
||||
uri: with_query,
|
||||
#[cfg(feature = "remote")]
|
||||
client_config: Default::default(),
|
||||
options: Default::default(),
|
||||
namespace_client_properties: Default::default(),
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
let db_with_query = ListingDatabase::connect_with_options(&request_with_query)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(db_with_query.query_string.as_deref(), Some("foo=bar"));
|
||||
let table_uri = db_with_query.table_uri("test").unwrap();
|
||||
assert_eq!(table_uri, format!("{}/test.lance?foo=bar", uri));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_client() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
@@ -13,10 +13,7 @@ use crate::{DistanceType, Error, Result, table::BaseTable};
|
||||
|
||||
use self::{
|
||||
scalar::{BTreeIndexBuilder, BitmapIndexBuilder, LabelListIndexBuilder},
|
||||
vector::{
|
||||
IvfHnswFlatIndexBuilder, IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder, IvfPqIndexBuilder,
|
||||
IvfSqIndexBuilder,
|
||||
},
|
||||
vector::{IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder, IvfPqIndexBuilder, IvfSqIndexBuilder},
|
||||
};
|
||||
|
||||
pub mod scalar;
|
||||
@@ -70,10 +67,6 @@ pub enum Index {
|
||||
/// IVF-HNSW index with Scalar Quantization
|
||||
/// It is a variant of the HNSW algorithm that uses scalar quantization to compress the vectors.
|
||||
IvfHnswSq(IvfHnswSqIndexBuilder),
|
||||
|
||||
/// IVF-HNSW index without quantization.
|
||||
/// Stores raw vectors, providing the highest recall at the cost of more memory and disk space.
|
||||
IvfHnswFlat(IvfHnswFlatIndexBuilder),
|
||||
}
|
||||
|
||||
/// Builder for the create_index operation
|
||||
@@ -297,8 +290,6 @@ pub enum IndexType {
|
||||
IvfHnswPq,
|
||||
#[serde(alias = "IVF_HNSW_SQ")]
|
||||
IvfHnswSq,
|
||||
#[serde(alias = "IVF_HNSW_FLAT")]
|
||||
IvfHnswFlat,
|
||||
// Scalar
|
||||
#[serde(alias = "BTREE")]
|
||||
BTree,
|
||||
@@ -320,7 +311,6 @@ impl std::fmt::Display for IndexType {
|
||||
Self::IvfRq => write!(f, "IVF_RQ"),
|
||||
Self::IvfHnswPq => write!(f, "IVF_HNSW_PQ"),
|
||||
Self::IvfHnswSq => write!(f, "IVF_HNSW_SQ"),
|
||||
Self::IvfHnswFlat => write!(f, "IVF_HNSW_FLAT"),
|
||||
Self::BTree => write!(f, "BTREE"),
|
||||
Self::Bitmap => write!(f, "BITMAP"),
|
||||
Self::LabelList => write!(f, "LABEL_LIST"),
|
||||
@@ -344,7 +334,6 @@ impl std::str::FromStr for IndexType {
|
||||
"IVF_RQ" => Ok(Self::IvfRq),
|
||||
"IVF_HNSW_PQ" => Ok(Self::IvfHnswPq),
|
||||
"IVF_HNSW_SQ" => Ok(Self::IvfHnswSq),
|
||||
"IVF_HNSW_FLAT" => Ok(Self::IvfHnswFlat),
|
||||
_ => Err(Error::InvalidInput {
|
||||
message: format!("the input value {} is not a valid IndexType", value),
|
||||
}),
|
||||
|
||||
@@ -474,46 +474,3 @@ impl IvfHnswSqIndexBuilder {
|
||||
impl_ivf_params_setter!();
|
||||
impl_hnsw_params_setter!();
|
||||
}
|
||||
|
||||
/// Builder for an IVF_HNSW_FLAT index.
|
||||
///
|
||||
/// This index combines IVF partitioning with an HNSW graph per partition,
|
||||
/// storing raw (unquantized) vectors. It offers the highest recall among
|
||||
/// the IVF_HNSW family at the cost of more memory and disk space compared
|
||||
/// to [`IvfHnswSqIndexBuilder`] or [`IvfHnswPqIndexBuilder`].
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct IvfHnswFlatIndexBuilder {
|
||||
// IVF
|
||||
#[serde(rename = "metric_type")]
|
||||
pub(crate) distance_type: DistanceType,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) num_partitions: Option<u32>,
|
||||
pub(crate) sample_rate: u32,
|
||||
pub(crate) max_iterations: u32,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) target_partition_size: Option<u32>,
|
||||
|
||||
// HNSW
|
||||
pub(crate) m: u32,
|
||||
pub(crate) ef_construction: u32,
|
||||
}
|
||||
|
||||
impl Default for IvfHnswFlatIndexBuilder {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
distance_type: DistanceType::L2,
|
||||
num_partitions: None,
|
||||
sample_rate: 256,
|
||||
max_iterations: 50,
|
||||
m: 20,
|
||||
ef_construction: 300,
|
||||
target_partition_size: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IvfHnswFlatIndexBuilder {
|
||||
impl_distance_type_setter!();
|
||||
impl_ivf_params_setter!();
|
||||
impl_hnsw_params_setter!();
|
||||
}
|
||||
|
||||
@@ -5,12 +5,11 @@
|
||||
|
||||
use std::{fmt::Formatter, sync::Arc};
|
||||
|
||||
use futures::{StreamExt, TryFutureExt, stream::BoxStream};
|
||||
use futures::{TryFutureExt, stream::BoxStream};
|
||||
use lance::io::WrappingObjectStore;
|
||||
use object_store::{
|
||||
CopyOptions, Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
|
||||
ObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
|
||||
UploadPart, path::Path,
|
||||
Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
|
||||
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, path::Path,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -94,6 +93,20 @@ impl ObjectStore for MirroringObjectStore {
|
||||
self.primary.get_opts(location, options).await
|
||||
}
|
||||
|
||||
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
|
||||
self.primary.head(location).await
|
||||
}
|
||||
|
||||
async fn delete(&self, location: &Path) -> Result<()> {
|
||||
if !location.primary_only() {
|
||||
match self.secondary.delete(location).await {
|
||||
Err(Error::NotFound { .. }) | Ok(_) => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
self.primary.delete(location).await
|
||||
}
|
||||
|
||||
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
|
||||
self.primary.list(prefix)
|
||||
}
|
||||
@@ -102,41 +115,22 @@ impl ObjectStore for MirroringObjectStore {
|
||||
self.primary.list_with_delimiter(prefix).await
|
||||
}
|
||||
|
||||
fn delete_stream(
|
||||
&self,
|
||||
locations: BoxStream<'static, Result<Path>>,
|
||||
) -> BoxStream<'static, Result<Path>> {
|
||||
let primary = self.primary.clone();
|
||||
let secondary = self.secondary.clone();
|
||||
locations
|
||||
.map(move |location| {
|
||||
let primary = primary.clone();
|
||||
let secondary = secondary.clone();
|
||||
async move {
|
||||
let location = location?;
|
||||
if !location.primary_only() {
|
||||
match secondary.delete(&location).await {
|
||||
Err(Error::NotFound { .. }) | Ok(_) => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
primary.delete(&location).await?;
|
||||
Ok(location)
|
||||
}
|
||||
})
|
||||
.buffered(10)
|
||||
.boxed()
|
||||
}
|
||||
|
||||
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
|
||||
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
|
||||
if to.primary_only() {
|
||||
self.primary.copy_opts(from, to, options).await
|
||||
self.primary.copy(from, to).await
|
||||
} else {
|
||||
self.secondary.copy_opts(from, to, options.clone()).await?;
|
||||
self.primary.copy_opts(from, to, options).await?;
|
||||
self.secondary.copy(from, to).await?;
|
||||
self.primary.copy(from, to).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
|
||||
if !to.primary_only() {
|
||||
self.secondary.copy(from, to).await?;
|
||||
}
|
||||
self.primary.copy_if_not_exists(from, to).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -10,9 +10,9 @@ use bytes::Bytes;
|
||||
use futures::stream::BoxStream;
|
||||
use lance::io::WrappingObjectStore;
|
||||
use object_store::{
|
||||
CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
|
||||
PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result as OSResult,
|
||||
UploadPart, path::Path,
|
||||
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
|
||||
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult, UploadPart,
|
||||
path::Path,
|
||||
};
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -81,6 +81,11 @@ impl IoTrackingStore {
|
||||
#[async_trait::async_trait]
|
||||
#[deny(clippy::missing_trait_methods)]
|
||||
impl ObjectStore for IoTrackingStore {
|
||||
async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult<PutResult> {
|
||||
self.record_write(bytes.content_length() as u64);
|
||||
self.target.put(location, bytes).await
|
||||
}
|
||||
|
||||
async fn put_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
@@ -91,6 +96,14 @@ impl ObjectStore for IoTrackingStore {
|
||||
self.target.put_opts(location, bytes, opts).await
|
||||
}
|
||||
|
||||
async fn put_multipart(&self, location: &Path) -> OSResult<Box<dyn MultipartUpload>> {
|
||||
let target = self.target.put_multipart(location).await?;
|
||||
Ok(Box::new(IoTrackingMultipartUpload {
|
||||
target,
|
||||
stats: self.stats.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn put_multipart_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
@@ -103,6 +116,15 @@ impl ObjectStore for IoTrackingStore {
|
||||
}))
|
||||
}
|
||||
|
||||
async fn get(&self, location: &Path) -> OSResult<GetResult> {
|
||||
let result = self.target.get(location).await;
|
||||
if let Ok(result) = &result {
|
||||
let num_bytes = result.range.end - result.range.start;
|
||||
self.record_read(num_bytes);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
|
||||
let result = self.target.get_opts(location, options).await;
|
||||
if let Ok(result) = &result {
|
||||
@@ -112,6 +134,14 @@ impl ObjectStore for IoTrackingStore {
|
||||
result
|
||||
}
|
||||
|
||||
async fn get_range(&self, location: &Path, range: std::ops::Range<u64>) -> OSResult<Bytes> {
|
||||
let result = self.target.get_range(location, range).await;
|
||||
if let Ok(result) = &result {
|
||||
self.record_read(result.len() as u64);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
async fn get_ranges(
|
||||
&self,
|
||||
location: &Path,
|
||||
@@ -124,11 +154,20 @@ impl ObjectStore for IoTrackingStore {
|
||||
result
|
||||
}
|
||||
|
||||
fn delete_stream(
|
||||
&self,
|
||||
locations: BoxStream<'static, OSResult<Path>>,
|
||||
) -> BoxStream<'static, OSResult<Path>> {
|
||||
async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
|
||||
self.record_read(0);
|
||||
self.target.head(location).await
|
||||
}
|
||||
|
||||
async fn delete(&self, location: &Path) -> OSResult<()> {
|
||||
self.record_write(0);
|
||||
self.target.delete(location).await
|
||||
}
|
||||
|
||||
fn delete_stream<'a>(
|
||||
&'a self,
|
||||
locations: BoxStream<'a, OSResult<Path>>,
|
||||
) -> BoxStream<'a, OSResult<Path>> {
|
||||
self.target.delete_stream(locations)
|
||||
}
|
||||
|
||||
@@ -151,14 +190,24 @@ impl ObjectStore for IoTrackingStore {
|
||||
self.target.list_with_delimiter(prefix).await
|
||||
}
|
||||
|
||||
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> OSResult<()> {
|
||||
async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
|
||||
self.record_write(0);
|
||||
self.target.copy_opts(from, to, options).await
|
||||
self.target.copy(from, to).await
|
||||
}
|
||||
|
||||
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> OSResult<()> {
|
||||
async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
|
||||
self.record_write(0);
|
||||
self.target.rename_opts(from, to, options).await
|
||||
self.target.rename(from, to).await
|
||||
}
|
||||
|
||||
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
|
||||
self.record_write(0);
|
||||
self.target.rename_if_not_exists(from, to).await
|
||||
}
|
||||
|
||||
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
|
||||
self.record_write(0);
|
||||
self.target.copy_if_not_exists(from, to).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1540,7 +1540,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Index::IvfPq(p) => ("IVF_PQ", Some(to_json(p)?)),
|
||||
Index::IvfSq(p) => ("IVF_SQ", Some(to_json(p)?)),
|
||||
Index::IvfHnswSq(p) => ("IVF_HNSW_SQ", Some(to_json(p)?)),
|
||||
Index::IvfHnswFlat(p) => ("IVF_HNSW_FLAT", Some(to_json(p)?)),
|
||||
Index::IvfRq(p) => ("IVF_RQ", Some(to_json(p)?)),
|
||||
Index::BTree(p) => ("BTREE", Some(to_json(p)?)),
|
||||
Index::Bitmap(p) => ("BITMAP", Some(to_json(p)?)),
|
||||
@@ -2069,8 +2068,7 @@ mod tests {
|
||||
use serde_json::json;
|
||||
|
||||
use crate::index::vector::{
|
||||
IvfFlatIndexBuilder, IvfHnswFlatIndexBuilder, IvfHnswSqIndexBuilder, IvfRqIndexBuilder,
|
||||
IvfSqIndexBuilder,
|
||||
IvfFlatIndexBuilder, IvfHnswSqIndexBuilder, IvfRqIndexBuilder, IvfSqIndexBuilder,
|
||||
};
|
||||
use crate::remote::JSON_CONTENT_TYPE;
|
||||
use crate::remote::db::DEFAULT_SERVER_VERSION;
|
||||
@@ -3323,35 +3321,6 @@ mod tests {
|
||||
.ef_construction(500),
|
||||
),
|
||||
),
|
||||
(
|
||||
"IVF_HNSW_FLAT",
|
||||
json!({
|
||||
"metric_type": "l2",
|
||||
"sample_rate": 256,
|
||||
"max_iterations": 50,
|
||||
"m": 20,
|
||||
"ef_construction": 300,
|
||||
}),
|
||||
Index::IvfHnswFlat(Default::default()),
|
||||
),
|
||||
(
|
||||
"IVF_HNSW_FLAT",
|
||||
json!({
|
||||
"metric_type": "cosine",
|
||||
"num_partitions": 64,
|
||||
"sample_rate": 256,
|
||||
"max_iterations": 50,
|
||||
"m": 40,
|
||||
"ef_construction": 500,
|
||||
}),
|
||||
Index::IvfHnswFlat(
|
||||
IvfHnswFlatIndexBuilder::default()
|
||||
.distance_type(DistanceType::Cosine)
|
||||
.num_partitions(64)
|
||||
.num_edges(40)
|
||||
.ef_construction(500),
|
||||
),
|
||||
),
|
||||
(
|
||||
"IVF_SQ",
|
||||
json!({
|
||||
|
||||
@@ -2033,24 +2033,6 @@ impl NativeTable {
|
||||
);
|
||||
Ok(Box::new(lance_idx_params))
|
||||
}
|
||||
Index::IvfHnswFlat(index) => {
|
||||
Self::validate_index_type(field, "IVF HNSW FLAT", supported_vector_data_type)?;
|
||||
let ivf_params = Self::build_ivf_params(
|
||||
index.num_partitions,
|
||||
index.target_partition_size,
|
||||
index.sample_rate,
|
||||
index.max_iterations,
|
||||
);
|
||||
let hnsw_params = HnswBuildParams::default()
|
||||
.num_edges(index.m as usize)
|
||||
.ef_construction(index.ef_construction as usize);
|
||||
let lance_idx_params = VectorIndexParams::ivf_hnsw(
|
||||
index.distance_type.into(),
|
||||
ivf_params,
|
||||
hnsw_params,
|
||||
);
|
||||
Ok(Box::new(lance_idx_params))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2076,8 +2058,7 @@ impl NativeTable {
|
||||
| Index::IvfPq(_)
|
||||
| Index::IvfRq(_)
|
||||
| Index::IvfHnswPq(_)
|
||||
| Index::IvfHnswSq(_)
|
||||
| Index::IvfHnswFlat(_) => IndexType::Vector,
|
||||
| Index::IvfHnswSq(_) => IndexType::Vector,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3195,56 +3176,6 @@ mod tests {
|
||||
assert_eq!(stats.num_unindexed_rows, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_index_ivf_hnsw_flat() {
|
||||
use arrow_array::RecordBatch;
|
||||
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
|
||||
use rand;
|
||||
use std::iter::repeat_with;
|
||||
|
||||
use crate::index::vector::IvfHnswFlatIndexBuilder;
|
||||
use arrow_array::Float32Array;
|
||||
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let conn = connect(uri).execute().await.unwrap();
|
||||
|
||||
let dimension = 16;
|
||||
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
|
||||
"embeddings",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, true)),
|
||||
dimension,
|
||||
),
|
||||
false,
|
||||
)]));
|
||||
|
||||
let float_arr = Float32Array::from(
|
||||
repeat_with(rand::random::<f32>)
|
||||
.take(512 * dimension as usize)
|
||||
.collect::<Vec<f32>>(),
|
||||
);
|
||||
|
||||
let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
|
||||
let batch = RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap();
|
||||
|
||||
let table = conn.create_table("test", batch).execute().await.unwrap();
|
||||
|
||||
let index = IvfHnswFlatIndexBuilder::default();
|
||||
table
|
||||
.create_index(&["embeddings"], Index::IvfHnswFlat(index))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let index_configs = table.list_indices().await.unwrap();
|
||||
assert_eq!(index_configs.len(), 1);
|
||||
let index = index_configs.into_iter().next().unwrap();
|
||||
assert_eq!(index.index_type, crate::index::IndexType::IvfHnswFlat);
|
||||
assert_eq!(index.columns, vec!["embeddings".to_string()]);
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 512);
|
||||
}
|
||||
|
||||
fn create_fixed_size_list<T: Array>(values: T, list_size: i32) -> Result<FixedSizeListArray> {
|
||||
let list_type = DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", values.data_type().clone(), true)),
|
||||
|
||||
Reference in New Issue
Block a user