feat(python)!: async-sync feature parity on Connections (#1905)

Closes #1791
Closes #1764
Closes #1897 (Makes this unnecessary)

BREAKING CHANGE: when using azure connection string `az://...` the call
to connect will fail if the azure storage credentials are not set. this
is breaking from the previous behaviour where the call would fail after
connect, when user invokes methods on the connection.
This commit is contained in:
Bert
2024-12-05 14:54:39 -05:00
committed by GitHub
parent 5f261cf2d8
commit 239f725b32
7 changed files with 83 additions and 66 deletions

View File

@@ -36,6 +36,7 @@ def connect(
read_consistency_interval: Optional[timedelta] = None,
request_thread_pool: Optional[Union[int, ThreadPoolExecutor]] = None,
client_config: Union[ClientConfig, Dict[str, Any], None] = None,
storage_options: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> DBConnection:
"""Connect to a LanceDB database.
@@ -67,6 +68,9 @@ def connect(
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
the keys are the attributes of the ClientConfig class. If None, then the
default configuration is used.
storage_options: dict, optional
Additional options for the storage backend. See available options at
https://lancedb.github.io/lancedb/guides/storage/
Examples
--------
@@ -111,7 +115,11 @@ def connect(
if kwargs:
raise ValueError(f"Unknown keyword arguments: {kwargs}")
return LanceDBConnection(uri, read_consistency_interval=read_consistency_interval)
return LanceDBConnection(
uri,
read_consistency_interval=read_consistency_interval,
storage_options=storage_options,
)
async def connect_async(

View File

@@ -13,34 +13,29 @@
from __future__ import annotations
import asyncio
import os
from abc import abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional, Union
import pyarrow as pa
from overrides import EnforceOverrides, override
from pyarrow import fs
from lancedb.common import data_to_reader, validate_schema
from lancedb.common import data_to_reader, sanitize_uri, validate_schema
from lancedb.background_loop import BackgroundEventLoop
from ._lancedb import connect as lancedb_connect
from .table import (
AsyncTable,
LanceTable,
Table,
_table_path,
sanitize_create_table,
)
from .util import (
fs_from_uri,
get_uri_location,
get_uri_scheme,
validate_table_name,
)
if TYPE_CHECKING:
import pyarrow as pa
from .pydantic import LanceModel
from datetime import timedelta
@@ -48,6 +43,8 @@ if TYPE_CHECKING:
from .common import DATA, URI
from .embeddings import EmbeddingFunctionConfig
LOOP = BackgroundEventLoop()
class DBConnection(EnforceOverrides):
"""An active LanceDB connection interface."""
@@ -180,6 +177,7 @@ class DBConnection(EnforceOverrides):
control over how data is saved, either provide the PyArrow schema to
convert to or else provide a [PyArrow Table](pyarrow.Table) directly.
>>> import pyarrow as pa
>>> custom_schema = pa.schema([
... pa.field("vector", pa.list_(pa.float32(), 2)),
... pa.field("lat", pa.float32()),
@@ -327,7 +325,11 @@ class LanceDBConnection(DBConnection):
"""
def __init__(
self, uri: URI, *, read_consistency_interval: Optional[timedelta] = None
self,
uri: URI,
*,
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
):
if not isinstance(uri, Path):
scheme = get_uri_scheme(uri)
@@ -338,9 +340,27 @@ class LanceDBConnection(DBConnection):
uri = uri.expanduser().absolute()
Path(uri).mkdir(parents=True, exist_ok=True)
self._uri = str(uri)
self._entered = False
self.read_consistency_interval = read_consistency_interval
self.storage_options = storage_options
if read_consistency_interval is not None:
read_consistency_interval_secs = read_consistency_interval.total_seconds()
else:
read_consistency_interval_secs = None
async def do_connect():
return await lancedb_connect(
sanitize_uri(uri),
None,
None,
None,
read_consistency_interval_secs,
None,
storage_options,
)
self._conn = AsyncConnection(LOOP.run(do_connect()))
def __repr__(self) -> str:
val = f"{self.__class__.__name__}({self._uri}"
@@ -364,32 +384,7 @@ class LanceDBConnection(DBConnection):
Iterator of str.
A list of table names.
"""
try:
asyncio.get_running_loop()
# User application is async. Soon we will just tell them to use the
# async version. Until then fallback to the old sync implementation.
try:
filesystem = fs_from_uri(self.uri)[0]
except pa.ArrowInvalid:
raise NotImplementedError("Unsupported scheme: " + self.uri)
try:
loc = get_uri_location(self.uri)
paths = filesystem.get_file_info(fs.FileSelector(loc))
except FileNotFoundError:
# It is ok if the file does not exist since it will be created
paths = []
tables = [
os.path.splitext(file_info.base_name)[0]
for file_info in paths
if file_info.extension == "lance"
]
tables.sort()
return tables
except RuntimeError:
# User application is sync. It is safe to use the async implementation
# under the hood.
return asyncio.run(self._async_get_table_names(page_token, limit))
return LOOP.run(self._conn.table_names(start_after=page_token, limit=limit))
def __len__(self) -> int:
return len(self.table_names())
@@ -461,19 +456,16 @@ class LanceDBConnection(DBConnection):
If True, ignore if the table does not exist.
"""
try:
table_uri = _table_path(self.uri, name)
filesystem, path = fs_from_uri(table_uri)
filesystem.delete_dir(path)
except FileNotFoundError:
LOOP.run(self._conn.drop_table(name))
except ValueError as e:
if not ignore_missing:
raise
raise e
if f"Table '{name}' was not found" not in str(e):
raise e
@override
def drop_database(self):
dummy_table_uri = _table_path(self.uri, "dummy")
uri = dummy_table_uri.removesuffix("dummy.lance")
filesystem, path = fs_from_uri(uri)
filesystem.delete_dir(path)
LOOP.run(self._conn.drop_database())
class AsyncConnection(object):
@@ -689,6 +681,7 @@ class AsyncConnection(object):
control over how data is saved, either provide the PyArrow schema to
convert to or else provide a [PyArrow Table](pyarrow.Table) directly.
>>> import pyarrow as pa
>>> custom_schema = pa.schema([
... pa.field("vector", pa.list_(pa.float32(), 2)),
... pa.field("lat", pa.float32()),

View File

@@ -20,19 +20,16 @@ import warnings
from lancedb import connect_async
from lancedb.remote import ClientConfig
from lancedb.remote.background_loop import BackgroundEventLoop
import pyarrow as pa
from overrides import override
from ..common import DATA
from ..db import DBConnection
from ..db import DBConnection, LOOP
from ..embeddings import EmbeddingFunctionConfig
from ..pydantic import LanceModel
from ..table import Table
from ..util import validate_table_name
LOOP = BackgroundEventLoop()
class RemoteDBConnection(DBConnection):
"""A connection to a remote LanceDB database."""

View File

@@ -1077,13 +1077,16 @@ class _LanceLatestDatasetRef(_LanceDatasetRef):
index_cache_size: Optional[int] = None
read_consistency_interval: Optional[timedelta] = None
last_consistency_check: Optional[float] = None
storage_options: Optional[Dict[str, str]] = None
_dataset: Optional[LanceDataset] = None
@property
def dataset(self) -> LanceDataset:
if not self._dataset:
self._dataset = lance.dataset(
self.uri, index_cache_size=self.index_cache_size
self.uri,
index_cache_size=self.index_cache_size,
storage_options=self.storage_options,
)
self.last_consistency_check = time.monotonic()
elif self.read_consistency_interval is not None:
@@ -1114,13 +1117,17 @@ class _LanceTimeTravelRef(_LanceDatasetRef):
uri: str
version: int
index_cache_size: Optional[int] = None
storage_options: Optional[Dict[str, str]] = None
_dataset: Optional[LanceDataset] = None
@property
def dataset(self) -> LanceDataset:
if not self._dataset:
self._dataset = lance.dataset(
self.uri, version=self.version, index_cache_size=self.index_cache_size
self.uri,
version=self.version,
index_cache_size=self.index_cache_size,
storage_options=self.storage_options,
)
return self._dataset
@@ -1169,24 +1176,27 @@ class LanceTable(Table):
uri=self._dataset_uri,
version=version,
index_cache_size=index_cache_size,
storage_options=connection.storage_options,
)
else:
self._ref = _LanceLatestDatasetRef(
uri=self._dataset_uri,
read_consistency_interval=connection.read_consistency_interval,
index_cache_size=index_cache_size,
storage_options=connection.storage_options,
)
@classmethod
def open(cls, db, name, **kwargs):
tbl = cls(db, name, **kwargs)
fs, path = fs_from_uri(tbl._dataset_path)
file_info = fs.get_file_info(path)
if file_info.type != pa.fs.FileType.Directory:
raise FileNotFoundError(
f"Table {name} does not exist."
f"Please first call db.create_table({name}, data)"
)
# check the dataset exists
try:
tbl.version
except ValueError as e:
if "Not found:" in str(e):
raise FileNotFoundError(f"Table {name} does not exist")
raise e
return tbl
@@ -1617,7 +1627,11 @@ class LanceTable(Table):
# Access the dataset_mut property to ensure that the dataset is mutable.
self._ref.dataset_mut
self._ref.dataset = lance.write_dataset(
data, self._dataset_uri, schema=self.schema, mode=mode
data,
self._dataset_uri,
schema=self.schema,
mode=mode,
storage_options=self._ref.storage_options,
)
def merge(
@@ -1902,7 +1916,13 @@ class LanceTable(Table):
empty = pa.Table.from_batches([], schema=schema)
try:
lance.write_dataset(empty, tbl._dataset_uri, schema=schema, mode=mode)
lance.write_dataset(
empty,
tbl._dataset_uri,
schema=schema,
mode=mode,
storage_options=db.storage_options,
)
except OSError as err:
if "Dataset already exists" in str(err) and exist_ok:
if tbl.schema != schema:

View File

@@ -30,6 +30,7 @@ class MockDB:
def __init__(self, uri: Path):
self.uri = str(uri)
self.read_consistency_interval = None
self.storage_options = None
@functools.cached_property
def is_managed_remote(self) -> bool: