feat: dynamodb commit store support (#1410)

This allows users to specify URIs like:

```
s3+ddb://my_bucket/path?ddbTableName=myCommitTable
```

and it will support concurrent writes in S3.

* [x] Add dynamodb integration tests
* [x] Add modifications to get it working in Python sync API
* [x] Added section in documentation describing how to configure.

Closes #534

---------

Co-authored-by: universalmind303 <cory.grinstead@gmail.com>
This commit is contained in:
Will Jones
2024-06-28 09:30:36 -07:00
committed by GitHub
parent d6485f1215
commit 865ed99881
13 changed files with 1844 additions and 58 deletions

View File

@@ -28,12 +28,11 @@ from lancedb.common import data_to_reader, validate_schema
from ._lancedb import connect as lancedb_connect
from .pydantic import LanceModel
from .table import AsyncTable, LanceTable, Table, _sanitize_data
from .table import AsyncTable, LanceTable, Table, _sanitize_data, _table_path
from .util import (
fs_from_uri,
get_uri_location,
get_uri_scheme,
join_uri,
validate_table_name,
)
@@ -457,16 +456,18 @@ class LanceDBConnection(DBConnection):
If True, ignore if the table does not exist.
"""
try:
filesystem, path = fs_from_uri(self.uri)
table_path = join_uri(path, name + ".lance")
filesystem.delete_dir(table_path)
table_uri = _table_path(self.uri, name)
filesystem, path = fs_from_uri(table_uri)
filesystem.delete_dir(path)
except FileNotFoundError:
if not ignore_missing:
raise
@override
def drop_database(self):
filesystem, path = fs_from_uri(self.uri)
dummy_table_uri = _table_path(self.uri, "dummy")
uri = dummy_table_uri.removesuffix("dummy.lance")
filesystem, path = fs_from_uri(uri)
filesystem.delete_dir(path)

View File

@@ -30,6 +30,7 @@ from typing import (
Tuple,
Union,
)
from urllib.parse import urlparse
import lance
import numpy as np
@@ -47,6 +48,7 @@ from .pydantic import LanceModel, model_to_dict
from .query import AsyncQuery, AsyncVectorQuery, LanceQueryBuilder, Query
from .util import (
fs_from_uri,
get_uri_scheme,
inf_vector_column_query,
join_uri,
safe_import_pandas,
@@ -208,6 +210,26 @@ def _to_record_batch_generator(
yield b
def _table_path(base: str, table_name: str) -> str:
"""
Get a table path that can be used in PyArrow FS.
Removes any weird schemes (such as "s3+ddb") and drops any query params.
"""
uri = _table_uri(base, table_name)
# Parse as URL
parsed = urlparse(uri)
# If scheme is s3+ddb, convert to s3
if parsed.scheme == "s3+ddb":
parsed = parsed._replace(scheme="s3")
# Remove query parameters
return parsed._replace(query=None).geturl()
def _table_uri(base: str, table_name: str) -> str:
return join_uri(base, f"{table_name}.lance")
class Table(ABC):
"""
A Table is a collection of Records in a LanceDB Database.
@@ -908,7 +930,7 @@ class LanceTable(Table):
@classmethod
def open(cls, db, name, **kwargs):
tbl = cls(db, name, **kwargs)
fs, path = fs_from_uri(tbl._dataset_uri)
fs, path = fs_from_uri(tbl._dataset_path)
file_info = fs.get_file_info(path)
if file_info.type != pa.fs.FileType.Directory:
raise FileNotFoundError(
@@ -918,9 +940,14 @@ class LanceTable(Table):
return tbl
@property
@cached_property
def _dataset_path(self) -> str:
# Cacheable since it's deterministic
return _table_path(self._conn.uri, self.name)
@cached_property
def _dataset_uri(self) -> str:
return join_uri(self._conn.uri, f"{self.name}.lance")
return _table_uri(self._conn.uri, self.name)
@property
def _dataset(self) -> LanceDataset:
@@ -1230,6 +1257,10 @@ class LanceTable(Table):
)
def _get_fts_index_path(self):
if get_uri_scheme(self._dataset_uri) != "file":
raise NotImplementedError(
"Full-text search is not supported on object stores."
)
return join_uri(self._dataset_uri, "_indices", "tantivy")
def add(

View File

@@ -139,8 +139,11 @@ def join_uri(base: Union[str, pathlib.Path], *parts: str) -> str:
# using pathlib for local paths make this windows compatible
# `get_uri_scheme` returns `file` for windows drive names (e.g. `c:\path`)
return str(pathlib.Path(base, *parts))
# for remote paths, just use os.path.join
return "/".join([p.rstrip("/") for p in [base, *parts]])
else:
# there might be query parameters in the base URI
url = urlparse(base)
new_path = "/".join([p.rstrip("/") for p in [url.path, *parts]])
return url._replace(path=new_path).geturl()
def attempt_import_or_raise(module: str, mitigation=None):

View File

@@ -13,6 +13,8 @@
import asyncio
import copy
from datetime import timedelta
import threading
import pytest
import pyarrow as pa
@@ -25,6 +27,7 @@ CONFIG = {
"aws_access_key_id": "ACCESSKEY",
"aws_secret_access_key": "SECRETKEY",
"aws_endpoint": "http://localhost:4566",
"dynamodb_endpoint": "http://localhost:4566",
"aws_region": "us-east-1",
}
@@ -156,3 +159,104 @@ def test_s3_sse(s3_bucket: str, kms_key: str):
validate_objects_encrypted(s3_bucket, path, kms_key)
asyncio.run(test())
@pytest.fixture(scope="module")
def commit_table():
ddb = get_boto3_client("dynamodb", endpoint_url=CONFIG["dynamodb_endpoint"])
table_name = "lance-integtest"
try:
ddb.delete_table(TableName=table_name)
except ddb.exceptions.ResourceNotFoundException:
pass
ddb.create_table(
TableName=table_name,
KeySchema=[
{"AttributeName": "base_uri", "KeyType": "HASH"},
{"AttributeName": "version", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "base_uri", "AttributeType": "S"},
{"AttributeName": "version", "AttributeType": "N"},
],
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
)
yield table_name
ddb.delete_table(TableName=table_name)
@pytest.mark.s3_test
def test_s3_dynamodb(s3_bucket: str, commit_table: str):
storage_options = copy.copy(CONFIG)
uri = f"s3+ddb://{s3_bucket}/test?ddbTableName={commit_table}"
data = pa.table({"x": [1, 2, 3]})
async def test():
db = await lancedb.connect_async(
uri,
storage_options=storage_options,
read_consistency_interval=timedelta(0),
)
table = await db.create_table("test", data)
# Five concurrent writers
async def insert():
# independent table refs for true concurrent writes.
table = await db.open_table("test")
await table.add(data, mode="append")
tasks = [insert() for _ in range(5)]
await asyncio.gather(*tasks)
row_count = await table.count_rows()
assert row_count == 3 * 6
asyncio.run(test())
@pytest.mark.s3_test
def test_s3_dynamodb_sync(s3_bucket: str, commit_table: str, monkeypatch):
# Sync API doesn't support storage_options, so we have to provide as env vars
for key, value in CONFIG.items():
monkeypatch.setenv(key.upper(), value)
uri = f"s3+ddb://{s3_bucket}/test2?ddbTableName={commit_table}"
data = pa.table({"x": ["a", "b", "c"]})
db = lancedb.connect(
uri,
read_consistency_interval=timedelta(0),
)
table = db.create_table("test_ddb_sync", data)
# Five concurrent writers
def insert():
table = db.open_table("test_ddb_sync")
table.add(data, mode="append")
threads = []
for _ in range(5):
thread = threading.Thread(target=insert)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
row_count = table.count_rows()
assert row_count == 3 * 6
# FTS indices should error since they are not supported yet.
with pytest.raises(
NotImplementedError, match="Full-text search is not supported on object stores."
):
table.create_fts_index("x")
# make sure list tables still works
assert db.table_names() == ["test_ddb_sync"]
db.drop_table("test_ddb_sync")
assert db.table_names() == []
db.drop_database()