mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-18 20:40:41 +00:00
Adds manifest_enabled for local/native connections so directory namespace manifests can be the source of truth, including migration from directory listing and Azure credential vending feature wiring. Also exposes the option through Rust, Python, and Node bindings with focused validation.
451 lines
17 KiB
Python
451 lines
17 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
|
|
|
|
|
import importlib.metadata
|
|
import os
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import timedelta
|
|
from typing import Dict, Optional, Union, Any, List
|
|
import warnings
|
|
|
|
__version__ = importlib.metadata.version("lancedb")
|
|
|
|
from ._lancedb import connect as lancedb_connect
|
|
from .common import URI, sanitize_uri
|
|
from urllib.parse import urlparse
|
|
from .db import AsyncConnection, DBConnection, LanceDBConnection
|
|
from .remote import ClientConfig
|
|
from .remote.db import RemoteDBConnection
|
|
from .expr import Expr, col, lit, func
|
|
from .schema import vector
|
|
from .table import AsyncTable, Table
|
|
from ._lancedb import Session
|
|
from .namespace import (
|
|
connect_namespace,
|
|
connect_namespace_async,
|
|
LanceNamespaceDBConnection,
|
|
AsyncLanceNamespaceDBConnection,
|
|
)
|
|
|
|
|
|
def _check_s3_bucket_with_dots(
|
|
uri: str, storage_options: Optional[Dict[str, str]]
|
|
) -> None:
|
|
"""
|
|
Check if an S3 URI has a bucket name containing dots and warn if no region
|
|
is specified. S3 buckets with dots cannot use virtual-hosted-style URLs,
|
|
which breaks automatic region detection.
|
|
|
|
See: https://github.com/lancedb/lancedb/issues/1898
|
|
"""
|
|
if not isinstance(uri, str) or not uri.startswith("s3://"):
|
|
return
|
|
|
|
parsed = urlparse(uri)
|
|
bucket = parsed.netloc
|
|
|
|
if "." not in bucket:
|
|
return
|
|
|
|
# Check if region is provided in storage_options
|
|
region_keys = {"region", "aws_region"}
|
|
has_region = storage_options and any(k in storage_options for k in region_keys)
|
|
|
|
if not has_region:
|
|
raise ValueError(
|
|
f"S3 bucket name '{bucket}' contains dots, which prevents automatic "
|
|
f"region detection. Please specify the region explicitly via "
|
|
f"storage_options={{'region': '<your-region>'}} or "
|
|
f"storage_options={{'aws_region': '<your-region>'}}. "
|
|
f"See https://github.com/lancedb/lancedb/issues/1898 for details."
|
|
)
|
|
|
|
|
|
def connect(
|
|
uri: Optional[URI] = None,
|
|
*,
|
|
api_key: Optional[str] = None,
|
|
region: str = "us-east-1",
|
|
host_override: Optional[str] = None,
|
|
read_consistency_interval: Optional[timedelta] = None,
|
|
request_thread_pool: Optional[Union[int, ThreadPoolExecutor]] = None,
|
|
client_config: Union[ClientConfig, Dict[str, Any], None] = None,
|
|
storage_options: Optional[Dict[str, str]] = None,
|
|
session: Optional[Session] = None,
|
|
manifest_enabled: bool = False,
|
|
namespace_client_impl: Optional[str] = None,
|
|
namespace_client_properties: Optional[Dict[str, str]] = None,
|
|
namespace_client_pushdown_operations: Optional[List[str]] = None,
|
|
**kwargs: Any,
|
|
) -> DBConnection:
|
|
"""Connect to a LanceDB database.
|
|
|
|
Parameters
|
|
----------
|
|
uri: str or Path, optional
|
|
The uri of the database. When ``namespace_client_impl`` is provided you may
|
|
omit ``uri`` and connect through a namespace client instead.
|
|
api_key: str, optional
|
|
If presented, connect to LanceDB cloud.
|
|
Otherwise, connect to a database on file system or cloud storage.
|
|
Can be set via environment variable `LANCEDB_API_KEY`.
|
|
region: str, default "us-east-1"
|
|
The region to use for LanceDB Cloud.
|
|
host_override: str, optional
|
|
The override url for LanceDB Cloud.
|
|
read_consistency_interval: timedelta, default None
|
|
(For LanceDB OSS only)
|
|
The interval at which to check for updates to the table from other
|
|
processes. If None, then consistency is not checked. For performance
|
|
reasons, this is the default. For strong consistency, set this to
|
|
zero seconds. Then every read will check for updates from other
|
|
processes. As a compromise, you can set this to a non-zero timedelta
|
|
for eventual consistency. If more than that interval has passed since
|
|
the last check, then the table will be checked for updates. Note: this
|
|
consistency only applies to read operations. Write operations are
|
|
always consistent.
|
|
client_config: ClientConfig or dict, optional
|
|
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
|
|
the keys are the attributes of the ClientConfig class. If None, then the
|
|
default configuration is used.
|
|
storage_options: dict, optional
|
|
Additional options for the storage backend. See available options at
|
|
<https://docs.lancedb.com/storage/>
|
|
manifest_enabled : bool, default False
|
|
When true for local/native connections, use directory namespace
|
|
manifests as the source of truth for table metadata. Existing
|
|
directory-listed root tables are migrated into the manifest on access.
|
|
session: Session, optional
|
|
(For LanceDB OSS only)
|
|
A session to use for this connection. Sessions allow you to configure
|
|
cache sizes for index and metadata caches, which can significantly
|
|
impact memory use and performance. They can also be re-used across
|
|
multiple connections to share the same cache state.
|
|
namespace_client_impl : str, optional
|
|
When provided along with ``namespace_client_properties``, ``connect``
|
|
returns a namespace-backed connection by delegating to
|
|
:func:`connect_namespace`. The value identifies which namespace
|
|
implementation to load (e.g., ``"dir"`` or ``"rest"``).
|
|
namespace_client_properties : dict, optional
|
|
Configuration to pass to the namespace client implementation. Required
|
|
when ``namespace_client_impl`` is set.
|
|
namespace_client_pushdown_operations : list[str], optional
|
|
Only used when ``namespace_client_properties`` is provided. Forwards to
|
|
:func:`connect_namespace` to control which operations are executed on the
|
|
namespace service (e.g., ``["QueryTable", "CreateTable"]``).
|
|
|
|
Examples
|
|
--------
|
|
|
|
For a local directory, provide a path for the database:
|
|
|
|
>>> import lancedb
|
|
>>> db = lancedb.connect("~/.lancedb")
|
|
|
|
For object storage, use a URI prefix:
|
|
|
|
>>> db = lancedb.connect("s3://my-bucket/lancedb",
|
|
... storage_options={"aws_access_key_id": "***"})
|
|
|
|
Connect to LanceDB cloud:
|
|
|
|
>>> db = lancedb.connect("db://my_database", api_key="ldb_...",
|
|
... client_config={"retry_config": {"retries": 5}})
|
|
|
|
Connect to a namespace-backed database:
|
|
|
|
>>> db = lancedb.connect(namespace_client_impl="dir",
|
|
... namespace_client_properties={"root": "/tmp/ns"})
|
|
|
|
Returns
|
|
-------
|
|
conn : DBConnection
|
|
A connection to a LanceDB database.
|
|
"""
|
|
if namespace_client_impl is not None:
|
|
if namespace_client_properties is None:
|
|
raise ValueError(
|
|
"namespace_client_properties must be provided when "
|
|
"namespace_client_impl is set"
|
|
)
|
|
if kwargs:
|
|
raise ValueError(f"Unknown keyword arguments: {kwargs}")
|
|
return connect_namespace(
|
|
namespace_client_impl,
|
|
namespace_client_properties,
|
|
read_consistency_interval=read_consistency_interval,
|
|
storage_options=storage_options,
|
|
session=session,
|
|
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
|
|
)
|
|
|
|
if namespace_client_properties is not None and not manifest_enabled:
|
|
raise ValueError(
|
|
"namespace_client_impl must be provided when using "
|
|
"namespace_client_properties unless manifest_enabled=True"
|
|
)
|
|
|
|
if namespace_client_pushdown_operations is not None:
|
|
raise ValueError(
|
|
"namespace_client_pushdown_operations is only valid when "
|
|
"connecting through a namespace"
|
|
)
|
|
if uri is None:
|
|
raise ValueError(
|
|
"uri is required when not connecting through a namespace client"
|
|
)
|
|
if isinstance(uri, str) and uri.startswith("db://"):
|
|
if api_key is None:
|
|
api_key = os.environ.get("LANCEDB_API_KEY")
|
|
if api_key is None:
|
|
raise ValueError(f"api_key is required to connect to LanceDB cloud: {uri}")
|
|
if isinstance(request_thread_pool, int):
|
|
request_thread_pool = ThreadPoolExecutor(request_thread_pool)
|
|
return RemoteDBConnection(
|
|
uri,
|
|
api_key,
|
|
region,
|
|
host_override,
|
|
# TODO: remove this (deprecation warning downstream)
|
|
request_thread_pool=request_thread_pool,
|
|
client_config=client_config,
|
|
storage_options=storage_options,
|
|
**kwargs,
|
|
)
|
|
_check_s3_bucket_with_dots(str(uri), storage_options)
|
|
|
|
if kwargs:
|
|
raise ValueError(f"Unknown keyword arguments: {kwargs}")
|
|
|
|
return LanceDBConnection(
|
|
uri,
|
|
read_consistency_interval=read_consistency_interval,
|
|
storage_options=storage_options,
|
|
session=session,
|
|
manifest_enabled=manifest_enabled,
|
|
namespace_client_properties=namespace_client_properties,
|
|
)
|
|
|
|
|
|
WORKER_PROPERTY_PREFIX = "_lancedb_worker_"
|
|
|
|
|
|
def _apply_worker_overrides(props: dict[str, str]) -> dict[str, str]:
|
|
"""Apply worker property overrides.
|
|
|
|
Any key starting with ``_lancedb_worker_`` is extracted, the prefix
|
|
is stripped, and the resulting key-value pair is put back into the
|
|
map (overriding the existing value if present). The original
|
|
prefixed key is removed.
|
|
"""
|
|
worker_keys = [k for k in props if k.startswith(WORKER_PROPERTY_PREFIX)]
|
|
if not worker_keys:
|
|
return props
|
|
result = dict(props)
|
|
for key in worker_keys:
|
|
value = result.pop(key)
|
|
real_key = key[len(WORKER_PROPERTY_PREFIX) :]
|
|
result[real_key] = value
|
|
return result
|
|
|
|
|
|
def deserialize_conn(
|
|
data: str,
|
|
*,
|
|
for_worker: bool = False,
|
|
) -> DBConnection:
|
|
"""Reconstruct a DBConnection from a serialized string.
|
|
|
|
The string must have been produced by
|
|
:meth:`DBConnection.serialize`.
|
|
|
|
Parameters
|
|
----------
|
|
data : str
|
|
String produced by ``serialize()``.
|
|
for_worker : bool, default False
|
|
When ``True``, any namespace client property whose key starts
|
|
with ``_lancedb_worker_`` has that prefix stripped and the
|
|
value overrides the corresponding property. For example,
|
|
``_lancedb_worker_uri`` replaces ``uri``.
|
|
|
|
Returns
|
|
-------
|
|
DBConnection
|
|
A new connection matching the serialized state.
|
|
"""
|
|
import json
|
|
|
|
parsed = json.loads(data)
|
|
connection_type = parsed.get("connection_type")
|
|
|
|
rci_secs = parsed.get("read_consistency_interval_seconds")
|
|
rci = timedelta(seconds=rci_secs) if rci_secs is not None else None
|
|
storage_options = parsed.get("storage_options")
|
|
|
|
if connection_type == "namespace":
|
|
props = dict(parsed.get("namespace_client_properties") or {})
|
|
if for_worker:
|
|
props = _apply_worker_overrides(props)
|
|
return connect_namespace(
|
|
namespace_client_impl=parsed["namespace_client_impl"],
|
|
namespace_client_properties=props,
|
|
read_consistency_interval=rci,
|
|
storage_options=storage_options,
|
|
namespace_client_pushdown_operations=parsed.get(
|
|
"namespace_client_pushdown_operations"
|
|
),
|
|
)
|
|
elif connection_type == "local":
|
|
return LanceDBConnection(
|
|
parsed["uri"],
|
|
read_consistency_interval=rci,
|
|
storage_options=storage_options,
|
|
manifest_enabled=parsed.get("manifest_enabled", False),
|
|
namespace_client_properties=parsed.get("namespace_client_properties"),
|
|
)
|
|
else:
|
|
raise ValueError(f"Unknown connection_type: {connection_type}")
|
|
|
|
|
|
async def connect_async(
|
|
uri: URI,
|
|
*,
|
|
api_key: Optional[str] = None,
|
|
region: str = "us-east-1",
|
|
host_override: Optional[str] = None,
|
|
read_consistency_interval: Optional[timedelta] = None,
|
|
client_config: Optional[Union[ClientConfig, Dict[str, Any]]] = None,
|
|
storage_options: Optional[Dict[str, str]] = None,
|
|
session: Optional[Session] = None,
|
|
manifest_enabled: bool = False,
|
|
namespace_client_properties: Optional[Dict[str, str]] = None,
|
|
) -> AsyncConnection:
|
|
"""Connect to a LanceDB database.
|
|
|
|
Parameters
|
|
----------
|
|
uri: str or Path
|
|
The uri of the database.
|
|
api_key: str, optional
|
|
If present, connect to LanceDB cloud.
|
|
Otherwise, connect to a database on file system or cloud storage.
|
|
Can be set via environment variable `LANCEDB_API_KEY`.
|
|
region: str, default "us-east-1"
|
|
The region to use for LanceDB Cloud.
|
|
host_override: str, optional
|
|
The override url for LanceDB Cloud.
|
|
read_consistency_interval: timedelta, default None
|
|
(For LanceDB OSS only)
|
|
The interval at which to check for updates to the table from other
|
|
processes. If None, then consistency is not checked. For performance
|
|
reasons, this is the default. For strong consistency, set this to
|
|
zero seconds. Then every read will check for updates from other
|
|
processes. As a compromise, you can set this to a non-zero timedelta
|
|
for eventual consistency. If more than that interval has passed since
|
|
the last check, then the table will be checked for updates. Note: this
|
|
consistency only applies to read operations. Write operations are
|
|
always consistent.
|
|
client_config: ClientConfig or dict, optional
|
|
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
|
|
the keys are the attributes of the ClientConfig class. If None, then the
|
|
default configuration is used.
|
|
storage_options: dict, optional
|
|
Additional options for the storage backend. See available options at
|
|
<https://docs.lancedb.com/storage/>
|
|
session: Session, optional
|
|
(For LanceDB OSS only)
|
|
A session to use for this connection. Sessions allow you to configure
|
|
cache sizes for index and metadata caches, which can significantly
|
|
impact memory use and performance. They can also be re-used across
|
|
multiple connections to share the same cache state.
|
|
manifest_enabled : bool, default False
|
|
When true for local/native connections, use directory namespace
|
|
manifests as the source of truth for table metadata. Existing
|
|
directory-listed root tables are migrated into the manifest on access.
|
|
namespace_client_properties : dict, optional
|
|
Additional directory namespace client properties to use with
|
|
``manifest_enabled=True``.
|
|
|
|
Examples
|
|
--------
|
|
|
|
>>> import lancedb
|
|
>>> async def doctest_example():
|
|
... # For a local directory, provide a path to the database
|
|
... db = await lancedb.connect_async("~/.lancedb")
|
|
... # For object storage, use a URI prefix
|
|
... db = await lancedb.connect_async("s3://my-bucket/lancedb",
|
|
... storage_options={
|
|
... "aws_access_key_id": "***"})
|
|
... # Connect to LanceDB cloud
|
|
... db = await lancedb.connect_async("db://my_database", api_key="ldb_...",
|
|
... client_config={
|
|
... "retry_config": {"retries": 5}})
|
|
|
|
Returns
|
|
-------
|
|
conn : AsyncConnection
|
|
A connection to a LanceDB database.
|
|
"""
|
|
if read_consistency_interval is not None:
|
|
read_consistency_interval_secs = read_consistency_interval.total_seconds()
|
|
else:
|
|
read_consistency_interval_secs = None
|
|
|
|
if isinstance(client_config, dict):
|
|
client_config = ClientConfig(**client_config)
|
|
|
|
_check_s3_bucket_with_dots(str(uri), storage_options)
|
|
|
|
return AsyncConnection(
|
|
await lancedb_connect(
|
|
sanitize_uri(uri),
|
|
api_key,
|
|
region,
|
|
host_override,
|
|
read_consistency_interval_secs,
|
|
client_config,
|
|
storage_options,
|
|
session,
|
|
manifest_enabled,
|
|
namespace_client_properties,
|
|
)
|
|
)
|
|
|
|
|
|
__all__ = [
|
|
"connect",
|
|
"connect_async",
|
|
"connect_namespace",
|
|
"connect_namespace_async",
|
|
"AsyncConnection",
|
|
"AsyncLanceNamespaceDBConnection",
|
|
"AsyncTable",
|
|
"col",
|
|
"Expr",
|
|
"func",
|
|
"lit",
|
|
"URI",
|
|
"sanitize_uri",
|
|
"vector",
|
|
"DBConnection",
|
|
"LanceDBConnection",
|
|
"LanceNamespaceDBConnection",
|
|
"RemoteDBConnection",
|
|
"Session",
|
|
"Table",
|
|
"__version__",
|
|
]
|
|
|
|
|
|
def __warn_on_fork():
|
|
warnings.warn(
|
|
"lance is not fork-safe. If you are using multiprocessing, use spawn instead.",
|
|
)
|
|
|
|
|
|
if hasattr(os, "register_at_fork"):
|
|
os.register_at_fork(before=__warn_on_fork) # type: ignore[attr-defined]
|