docs: add the async python API to the docs (#1156)

This commit is contained in:
Weston Pace
2024-03-26 07:54:16 -05:00
parent ccf13f15d4
commit f97c7dad8c
13 changed files with 623 additions and 399 deletions

View File

@@ -144,34 +144,20 @@ async def connect_async(
the last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
always consistent.
request_thread_pool: int or ThreadPoolExecutor, optional
The thread pool to use for making batch requests to the LanceDB Cloud API.
If an integer, then a ThreadPoolExecutor will be created with that
number of threads. If None, then a ThreadPoolExecutor will be created
with the default number of threads. If a ThreadPoolExecutor, then that
executor will be used for making requests. This is for LanceDB Cloud
only and is only used when making batch requests (i.e., passing in
multiple queries to the search method at once).
Examples
--------
For a local directory, provide a path for the database:
>>> import lancedb
>>> db = lancedb.connect("~/.lancedb")
For object storage, use a URI prefix:
>>> db = lancedb.connect("s3://my-bucket/lancedb")
Connect to LancdDB cloud:
>>> db = lancedb.connect("db://my_database", api_key="ldb_...")
>>> async def doctest_example():
... # For a local directory, provide a path to the database
... db = await lancedb.connect_async("~/.lancedb")
... # For object storage, use a URI prefix
... db = await lancedb.connect_async("s3://my-bucket/lancedb")
Returns
-------
conn : DBConnection
conn : AsyncConnection
A connection to a LanceDB database.
"""
if read_consistency_interval is not None:

View File

@@ -25,7 +25,6 @@ from overrides import EnforceOverrides, override
from pyarrow import fs
from lancedb.common import data_to_reader, validate_schema
from lancedb.embeddings.registry import EmbeddingFunctionRegistry
from lancedb.utils.events import register_event
from ._lancedb import connect as lancedb_connect
@@ -451,16 +450,17 @@ class LanceDBConnection(DBConnection):
class AsyncConnection(object):
"""An active LanceDB connection
To obtain a connection you can use the [connect] function.
To obtain a connection you can use the [connect_async][lancedb.connect_async]
function.
This could be a native connection (using lance) or a remote connection (e.g. for
connecting to LanceDb Cloud)
Local connections do not currently hold any open resources but they may do so in the
future (for example, for shared cache or connections to catalog services) Remote
connections represent an open connection to the remote server. The [close] method
can be used to release any underlying resources eagerly. The connection can also
be used as a context manager:
connections represent an open connection to the remote server. The
[close][lancedb.db.AsyncConnection.close] method can be used to release any
underlying resources eagerly. The connection can also be used as a context manager.
Connections can be shared on multiple threads and are expected to be long lived.
Connections can also be used as a context manager, however, in many cases a single
@@ -471,10 +471,9 @@ class AsyncConnection(object):
Examples
--------
>>> import asyncio
>>> import lancedb
>>> async def my_connect():
... with await lancedb.connect("/tmp/my_dataset") as conn:
>>> async def doctest_example():
... with await lancedb.connect_async("/tmp/my_dataset") as conn:
... # do something with the connection
... pass
... # conn is closed here
@@ -535,9 +534,8 @@ class AsyncConnection(object):
exist_ok: Optional[bool] = None,
on_bad_vectors: Optional[str] = None,
fill_value: Optional[float] = None,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
) -> AsyncTable:
"""Create a [Table][lancedb.table.Table] in the database.
"""Create an [AsyncTable][lancedb.table.AsyncTable] in the database.
Parameters
----------
@@ -576,7 +574,7 @@ class AsyncConnection(object):
Returns
-------
LanceTable
AsyncTable
A reference to the newly created table.
!!! note
@@ -590,12 +588,14 @@ class AsyncConnection(object):
Can create with list of tuples or dictionaries:
>>> import lancedb
>>> db = lancedb.connect("./.lancedb")
>>> data = [{"vector": [1.1, 1.2], "lat": 45.5, "long": -122.7},
... {"vector": [0.2, 1.8], "lat": 40.1, "long": -74.1}]
>>> db.create_table("my_table", data)
LanceTable(connection=..., name="my_table")
>>> db["my_table"].head()
>>> async def doctest_example():
... db = await lancedb.connect_async("./.lancedb")
... data = [{"vector": [1.1, 1.2], "lat": 45.5, "long": -122.7},
... {"vector": [0.2, 1.8], "lat": 40.1, "long": -74.1}]
... my_table = await db.create_table("my_table", data)
... print(await my_table.query().limit(5).to_arrow())
>>> import asyncio
>>> asyncio.run(doctest_example())
pyarrow.Table
vector: fixed_size_list<item: float>[2]
child 0, item: float
@@ -614,9 +614,11 @@ class AsyncConnection(object):
... "lat": [45.5, 40.1],
... "long": [-122.7, -74.1]
... })
>>> db.create_table("table2", data)
LanceTable(connection=..., name="table2")
>>> db["table2"].head()
>>> async def pandas_example():
... db = await lancedb.connect_async("./.lancedb")
... my_table = await db.create_table("table2", data)
... print(await my_table.query().limit(5).to_arrow())
>>> asyncio.run(pandas_example())
pyarrow.Table
vector: fixed_size_list<item: float>[2]
child 0, item: float
@@ -636,9 +638,11 @@ class AsyncConnection(object):
... pa.field("lat", pa.float32()),
... pa.field("long", pa.float32())
... ])
>>> db.create_table("table3", data, schema = custom_schema)
LanceTable(connection=..., name="table3")
>>> db["table3"].head()
>>> async def with_schema():
... db = await lancedb.connect_async("./.lancedb")
... my_table = await db.create_table("table3", data, schema = custom_schema)
... print(await my_table.query().limit(5).to_arrow())
>>> asyncio.run(with_schema())
pyarrow.Table
vector: fixed_size_list<item: float>[2]
child 0, item: float
@@ -670,9 +674,10 @@ class AsyncConnection(object):
... pa.field("item", pa.utf8()),
... pa.field("price", pa.float32()),
... ])
>>> db.create_table("table4", make_batches(), schema=schema)
LanceTable(connection=..., name="table4")
>>> async def iterable_example():
... db = await lancedb.connect_async("./.lancedb")
... await db.create_table("table4", make_batches(), schema=schema)
>>> asyncio.run(iterable_example())
"""
if inspect.isclass(schema) and issubclass(schema, LanceModel):
# convert LanceModel to pyarrow schema
@@ -681,12 +686,6 @@ class AsyncConnection(object):
schema = schema.to_arrow_schema()
metadata = None
if embedding_functions is not None:
# If we passed in embedding functions explicitly
# then we'll override any schema metadata that
# may was implicitly specified by the LanceModel schema
registry = EmbeddingFunctionRegistry.get_instance()
metadata = registry.get_table_metadata(embedding_functions)
# Defining defaults here and not in function prototype. In the future
# these defaults will move into rust so better to keep them as None.
@@ -767,11 +766,11 @@ class AsyncConnection(object):
name: str
The name of the table.
"""
raise NotImplementedError
await self._inner.drop_table(name)
async def drop_database(self):
"""
Drop database
This is the same thing as dropping all the tables
"""
raise NotImplementedError
await self._inner.drop_db()

View File

@@ -1033,7 +1033,7 @@ class AsyncQueryBase(object):
Construct an AsyncQueryBase
This method is not intended to be called directly. Instead, use the
[Table.query][] method to create a query.
[AsyncTable.query][lancedb.table.AsyncTable.query] method to create a query.
"""
self._inner = inner
@@ -1041,7 +1041,10 @@ class AsyncQueryBase(object):
"""
Only return rows matching the given predicate
The predicate should be supplied as an SQL query string. For example:
The predicate should be supplied as an SQL query string.
Examples
--------
>>> predicate = "x > 10"
>>> predicate = "y > 0 AND y < 100"
@@ -1112,7 +1115,8 @@ class AsyncQueryBase(object):
Execute the query and collect the results into an Apache Arrow Table.
This method will collect all results into memory before returning. If
you expect a large number of results, you may want to use [to_batches][]
you expect a large number of results, you may want to use
[to_batches][lancedb.query.AsyncQueryBase.to_batches]
"""
batch_iter = await self.to_batches()
return pa.Table.from_batches(
@@ -1123,12 +1127,13 @@ class AsyncQueryBase(object):
"""
Execute the query and collect the results into a pandas DataFrame.
This method will collect all results into memory before returning. If
you expect a large number of results, you may want to use [to_batches][]
and convert each batch to pandas separately.
This method will collect all results into memory before returning. If you
expect a large number of results, you may want to use
[to_batches][lancedb.query.AsyncQueryBase.to_batches] and convert each batch to
pandas separately.
Example
-------
Examples
--------
>>> import asyncio
>>> from lancedb import connect_async
@@ -1148,7 +1153,7 @@ class AsyncQuery(AsyncQueryBase):
Construct an AsyncQuery
This method is not intended to be called directly. Instead, use the
[Table.query][] method to create a query.
[AsyncTable.query][lancedb.table.AsyncTable.query] method to create a query.
"""
super().__init__(inner)
self._inner = inner
@@ -1189,8 +1194,8 @@ class AsyncQuery(AsyncQueryBase):
If there is only one vector column (a column whose data type is a
fixed size list of floats) then the column does not need to be specified.
If there is more than one vector column you must use
[AsyncVectorQuery::column][] to specify which column you would like to
compare with.
[AsyncVectorQuery.column][lancedb.query.AsyncVectorQuery.column] to specify
which column you would like to compare with.
If no index has been created on the vector column then a vector query
will perform a distance comparison between the query vector and every
@@ -1221,8 +1226,10 @@ class AsyncVectorQuery(AsyncQueryBase):
Construct an AsyncVectorQuery
This method is not intended to be called directly. Instead, create
a query first with [Table.query][] and then use [AsyncQuery.nearest_to][]
to convert to a vector query.
a query first with [AsyncTable.query][lancedb.table.AsyncTable.query] and then
use [AsyncQuery.nearest_to][lancedb.query.AsyncQuery.nearest_to]] to convert to
a vector query. Or you can use
[AsyncTable.vector_search][lancedb.table.AsyncTable.vector_search]
"""
super().__init__(inner)
self._inner = inner
@@ -1232,7 +1239,7 @@ class AsyncVectorQuery(AsyncQueryBase):
Set the vector column to query
This controls which column is compared to the query vector supplied in
the call to [Query.nearest_to][].
the call to [AsyncQuery.nearest_to][lancedb.query.AsyncQuery.nearest_to].
This parameter must be specified if the table has more than one column
whose data type is a fixed-size-list of floats.

View File

@@ -1885,8 +1885,8 @@ class AsyncTable:
An AsyncTable object is expected to be long lived and reused for multiple
operations. AsyncTable objects will cache a certain amount of index data in memory.
This cache will be freed when the Table is garbage collected. To eagerly free the
cache you can call the [close][AsyncTable.close] method. Once the AsyncTable is
closed, it cannot be used for any further operations.
cache you can call the [close][lancedb.AsyncTable.close] method. Once the
AsyncTable is closed, it cannot be used for any further operations.
An AsyncTable can also be used as a context manager, and will automatically close
when the context is exited. Closing a table is optional. If you do not close the
@@ -1895,13 +1895,17 @@ class AsyncTable:
Examples
--------
Create using [DBConnection.create_table][lancedb.DBConnection.create_table]
Create using [AsyncConnection.create_table][lancedb.AsyncConnection.create_table]
(more examples in that method's documentation).
>>> import lancedb
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data=[{"vector": [1.1, 1.2], "b": 2}])
>>> table.head()
>>> async def create_a_table():
... db = await lancedb.connect_async("./.lancedb")
... data = [{"vector": [1.1, 1.2], "b": 2}]
... table = await db.create_table("my_table", data=data)
... print(await table.query().limit(5).to_arrow())
>>> import asyncio
>>> asyncio.run(create_a_table())
pyarrow.Table
vector: fixed_size_list<item: float>[2]
child 0, item: float
@@ -1910,25 +1914,37 @@ class AsyncTable:
vector: [[[1.1,1.2]]]
b: [[2]]
Can append new data with [Table.add()][lancedb.table.Table.add].
Can append new data with [AsyncTable.add()][lancedb.table.AsyncTable.add].
>>> table.add([{"vector": [0.5, 1.3], "b": 4}])
>>> async def add_to_table():
... db = await lancedb.connect_async("./.lancedb")
... table = await db.open_table("my_table")
... await table.add([{"vector": [0.5, 1.3], "b": 4}])
>>> asyncio.run(add_to_table())
Can query the table with [Table.search][lancedb.table.Table.search].
Can query the table with
[AsyncTable.vector_search][lancedb.table.AsyncTable.vector_search].
>>> table.search([0.4, 0.4]).select(["b", "vector"]).to_pandas()
>>> async def search_table_for_vector():
... db = await lancedb.connect_async("./.lancedb")
... table = await db.open_table("my_table")
... results = (
... await table.vector_search([0.4, 0.4]).select(["b", "vector"]).to_pandas()
... )
... print(results)
>>> asyncio.run(search_table_for_vector())
b vector _distance
0 4 [0.5, 1.3] 0.82
1 2 [1.1, 1.2] 1.13
Search queries are much faster when an index is created. See
[Table.create_index][lancedb.table.Table.create_index].
[AsyncTable.create_index][lancedb.table.AsyncTable.create_index].
"""
def __init__(self, table: LanceDBTable):
"""Create a new Table object.
"""Create a new AsyncTable object.
You should not create Table objects directly.
You should not create AsyncTable objects directly.
Use [AsyncConnection.create_table][lancedb.AsyncConnection.create_table] and
[AsyncConnection.open_table][lancedb.AsyncConnection.open_table] to obtain
@@ -1980,6 +1996,14 @@ class AsyncTable:
return await self._inner.count_rows(filter)
def query(self) -> AsyncQuery:
"""
Returns an [AsyncQuery][lancedb.query.AsyncQuery] that can be used
to search the table.
Use methods on the returned query to control query behavior. The query
can be executed with methods like [to_arrow][lancedb.query.AsyncQuery.to_arrow],
[to_pandas][lancedb.query.AsyncQuery.to_pandas] and more.
"""
return AsyncQuery(self._inner.query())
async def to_pandas(self) -> "pd.DataFrame":
@@ -2016,20 +2040,8 @@ class AsyncTable:
Parameters
----------
index: Index
The index to create.
LanceDb supports multiple types of indices. See the static methods on
the Index class for more details.
column: str, default None
column: str
The column to index.
When building a scalar index this must be set.
When building a vector index, this is optional. The default will look
for any columns of type fixed-size-list with floating point values. If
there is only one column of this type then it will be used. Otherwise
an error will be returned.
replace: bool, default True
Whether to replace the existing index
@@ -2038,6 +2050,10 @@ class AsyncTable:
that index is out of date.
The default is True
config: Union[IvfPq, BTree], default None
For advanced configuration you can specify the type of index you would
like to create. You can also specify index-specific parameters when
creating an index object.
"""
index = None
if config is not None:
@@ -2159,7 +2175,8 @@ class AsyncTable:
Search the table with a given query vector.
This is a convenience method for preparing a vector query and
is the same thing as calling `nearestTo` on the builder returned
by `query`. Seer [nearest_to][AsyncQuery.nearest_to] for more details.
by `query`. Seer [nearest_to][lancedb.query.AsyncQuery.nearest_to] for more
details.
"""
return self.query().nearest_to(query_vector)
@@ -2225,7 +2242,7 @@ class AsyncTable:
x vector
0 3 [5.0, 6.0]
"""
raise NotImplementedError
return await self._inner.delete(where)
async def update(
self,
@@ -2281,102 +2298,6 @@ class AsyncTable:
return await self._inner.update(updates_sql, where)
async def cleanup_old_versions(
self,
older_than: Optional[timedelta] = None,
*,
delete_unverified: bool = False,
) -> CleanupStats:
"""
Clean up old versions of the table, freeing disk space.
Note: This function is not available in LanceDb Cloud (since LanceDb
Cloud manages cleanup for you automatically)
Parameters
----------
older_than: timedelta, default None
The minimum age of the version to delete. If None, then this defaults
to two weeks.
delete_unverified: bool, default False
Because they may be part of an in-progress transaction, files newer
than 7 days old are not deleted by default. If you are sure that
there are no in-progress transactions, then you can set this to True
to delete all files older than `older_than`.
Returns
-------
CleanupStats
The stats of the cleanup operation, including how many bytes were
freed.
"""
raise NotImplementedError
async def compact_files(self, *args, **kwargs):
"""
Run the compaction process on the table.
Note: This function is not available in LanceDb Cloud (since LanceDb
Cloud manages compaction for you automatically)
This can be run after making several small appends to optimize the table
for faster reads.
Arguments are passed onto :meth:`lance.dataset.DatasetOptimizer.compact_files`.
For most cases, the default should be fine.
"""
raise NotImplementedError
async def add_columns(self, transforms: Dict[str, str]):
"""
Add new columns with defined values.
This is not yet available in LanceDB Cloud.
Parameters
----------
transforms: Dict[str, str]
A map of column name to a SQL expression to use to calculate the
value of the new column. These expressions will be evaluated for
each row in the table, and can reference existing columns.
"""
raise NotImplementedError
async def alter_columns(self, alterations: Iterable[Dict[str, str]]):
"""
Alter column names and nullability.
This is not yet available in LanceDB Cloud.
alterations : Iterable[Dict[str, Any]]
A sequence of dictionaries, each with the following keys:
- "path": str
The column path to alter. For a top-level column, this is the name.
For a nested column, this is the dot-separated path, e.g. "a.b.c".
- "name": str, optional
The new name of the column. If not specified, the column name is
not changed.
- "nullable": bool, optional
Whether the column should be nullable. If not specified, the column
nullability is not changed. Only non-nullable columns can be changed
to nullable. Currently, you cannot change a nullable column to
non-nullable.
"""
raise NotImplementedError
async def drop_columns(self, columns: Iterable[str]):
"""
Drop columns from the table.
This is not yet available in LanceDB Cloud.
Parameters
----------
columns : Iterable[str]
The names of the columns to drop.
"""
raise NotImplementedError
async def version(self) -> int:
"""
Retrieve the version of the table

View File

@@ -0,0 +1,162 @@
import shutil
# --8<-- [start:imports]
import lancedb
import pandas as pd
import pyarrow as pa
# --8<-- [end:imports]
import pytest
from numpy.random import randint, random
shutil.rmtree("data/sample-lancedb", ignore_errors=True)
def test_quickstart():
# --8<-- [start:connect]
uri = "data/sample-lancedb"
db = lancedb.connect(uri)
# --8<-- [end:connect]
# --8<-- [start:create_table]
data = [
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
]
# Synchronous client
tbl = db.create_table("my_table", data=data)
# --8<-- [end:create_table]
# --8<-- [start:create_table_pandas]
df = pd.DataFrame(
[
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
]
)
# Synchronous client
tbl = db.create_table("table_from_df", data=df)
# --8<-- [end:create_table_pandas]
# --8<-- [start:create_empty_table]
schema = pa.schema([pa.field("vector", pa.list_(pa.float32(), list_size=2))])
# Synchronous client
tbl = db.create_table("empty_table", schema=schema)
# --8<-- [end:create_empty_table]
# --8<-- [start:open_table]
# Synchronous client
tbl = db.open_table("my_table")
# --8<-- [end:open_table]
# --8<-- [start:table_names]
# Synchronous client
print(db.table_names())
# --8<-- [end:table_names]
# Synchronous client
# --8<-- [start:add_data]
# Option 1: Add a list of dicts to a table
data = [
{"vector": [1.3, 1.4], "item": "fizz", "price": 100.0},
{"vector": [9.5, 56.2], "item": "buzz", "price": 200.0},
]
tbl.add(data)
# Option 2: Add a pandas DataFrame to a table
df = pd.DataFrame(data)
tbl.add(data)
# --8<-- [end:add_data]
# --8<-- [start:vector_search]
# Synchronous client
tbl.search([100, 100]).limit(2).to_pandas()
# --8<-- [end:vector_search]
tbl.add(
[
{"vector": random(2), "item": "autogen", "price": randint(100)}
for _ in range(1000)
]
)
# --8<-- [start:create_index]
# Synchronous client
tbl.create_index(num_sub_vectors=1)
# --8<-- [end:create_index]
# --8<-- [start:delete_rows]
# Synchronous client
tbl.delete('item = "fizz"')
# --8<-- [end:delete_rows]
# --8<-- [start:drop_table]
# Synchronous client
db.drop_table("my_table")
# --8<-- [end:drop_table]
@pytest.mark.asyncio
async def test_quickstart_async():
# --8<-- [start:connect_async]
# LanceDb offers both a synchronous and an asynchronous client. There are still a
# few operations that are only supported by the synchronous client (e.g. embedding
# functions, full text search) but both APIs should soon be equivalent
# In this guide we will give examples of both clients. In other guides we will
# typically only provide examples with one client or the other.
uri = "data/sample-lancedb"
async_db = await lancedb.connect_async(uri)
# --8<-- [end:connect_async]
data = [
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
]
# --8<-- [start:create_table_async]
# Asynchronous client
async_tbl = await async_db.create_table("my_table2", data=data)
# --8<-- [end:create_table_async]
df = pd.DataFrame(
[
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
]
)
# --8<-- [start:create_table_async_pandas]
# Asynchronous client
async_tbl = await async_db.create_table("table_from_df2", df)
# --8<-- [end:create_table_async_pandas]
schema = pa.schema([pa.field("vector", pa.list_(pa.float32(), list_size=2))])
# --8<-- [start:create_empty_table_async]
# Asynchronous client
async_tbl = await async_db.create_table("empty_table2", schema=schema)
# --8<-- [end:create_empty_table_async]
# --8<-- [start:open_table_async]
# Asynchronous client
async_tbl = await async_db.open_table("my_table2")
# --8<-- [end:open_table_async]
# --8<-- [start:table_names_async]
# Asynchronous client
print(await async_db.table_names())
# --8<-- [end:table_names_async]
# --8<-- [start:add_data_async]
# Asynchronous client
await async_tbl.add(data)
# --8<-- [end:add_data_async]
# Add sufficient data for training
data = [{"vector": [x, x], "item": "filler", "price": x * x} for x in range(1000)]
await async_tbl.add(data)
# --8<-- [start:vector_search_async]
# Asynchronous client
await async_tbl.vector_search([100, 100]).limit(2).to_pandas()
# --8<-- [end:vector_search_async]
# --8<-- [start:create_index_async]
# Asynchronous client (must specify column to index)
await async_tbl.create_index("vector")
# --8<-- [end:create_index_async]
# --8<-- [start:delete_rows_async]
# Asynchronous client
await async_tbl.delete('item = "fizz"')
# --8<-- [end:delete_rows_async]
# --8<-- [start:drop_table_async]
# Asynchronous client
await async_db.drop_table("my_table2")
# --8<-- [end:drop_table_async]