feat: support optimize indices in sync API (#1769)

Signed-off-by: BubbleCal <bubble-cal@outlook.com>
This commit is contained in:
BubbleCal
2024-11-09 00:48:07 +08:00
committed by GitHub
parent fa9ca8f7a6
commit 4372c231cd
3 changed files with 192 additions and 1 deletions

View File

@@ -11,6 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import timedelta
import asyncio
import logging
from functools import cached_property
@@ -478,6 +479,19 @@ class RemoteTable(Table):
"compact_files() is not supported on the LanceDB cloud"
)
def optimize(
self,
*,
cleanup_older_than: Optional[timedelta] = None,
delete_unverified: bool = False,
):
"""optimize() is not supported on the LanceDB cloud.
Indices are optimized automatically."""
raise NotImplementedError(
"optimize() is not supported on the LanceDB cloud. "
"Indices are optimized automatically."
)
def count_rows(self, filter: Optional[str] = None) -> int:
return self._loop.run_until_complete(self._table.count_rows(filter))

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import asyncio
import inspect
import time
from abc import ABC, abstractmethod
@@ -32,7 +33,7 @@ import pyarrow.fs as pa_fs
from lance import LanceDataset
from lance.dependencies import _check_for_hugging_face
from .common import DATA, VEC, VECTOR_COLUMN_NAME
from .common import DATA, VEC, VECTOR_COLUMN_NAME, sanitize_uri
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
from .merge import LanceMergeInsertBuilder
from .pydantic import LanceModel, model_to_dict
@@ -57,6 +58,8 @@ from .util import (
)
from .index import lang_mapping
from ._lancedb import connect as lancedb_connect
if TYPE_CHECKING:
import PIL
from lance.dataset import CleanupStats, ReaderLike
@@ -893,6 +896,55 @@ class Table(ABC):
For most cases, the default should be fine.
"""
@abstractmethod
def optimize(
self,
*,
cleanup_older_than: Optional[timedelta] = None,
delete_unverified: bool = False,
):
"""
Optimize the on-disk data and indices for better performance.
Modeled after ``VACUUM`` in PostgreSQL.
Optimization covers three operations:
* Compaction: Merges small files into larger ones
* Prune: Removes old versions of the dataset
* Index: Optimizes the indices, adding new data to existing indices
Parameters
----------
cleanup_older_than: timedelta, optional default 7 days
All files belonging to versions older than this will be removed. Set
to 0 days to remove all versions except the latest. The latest version
is never removed.
delete_unverified: bool, default False
Files leftover from a failed transaction may appear to be part of an
in-progress operation (e.g. appending new data) and these files will not
be deleted unless they are at least 7 days old. If delete_unverified is True
then these files will be deleted regardless of their age.
Experimental API
----------------
The optimization process is undergoing active development and may change.
Our goal with these changes is to improve the performance of optimization and
reduce the complexity.
That being said, it is essential today to run optimize if you want the best
performance. It should be stable and safe to use in production, but it our
hope that the API may be simplified (or not even need to be called) in the
future.
The frequency an application shoudl call optimize is based on the frequency of
data modifications. If data is frequently added, deleted, or updated then
optimize should be run frequently. A good rule of thumb is to run optimize if
you have added or modified 100,000 or more records or run more than 20 data
modification operations.
"""
@abstractmethod
def add_columns(self, transforms: Dict[str, str]):
"""
@@ -1971,6 +2023,83 @@ class LanceTable(Table):
"""
return self.to_lance().optimize.compact_files(*args, **kwargs)
def optimize(
self,
*,
cleanup_older_than: Optional[timedelta] = None,
delete_unverified: bool = False,
):
"""
Optimize the on-disk data and indices for better performance.
Modeled after ``VACUUM`` in PostgreSQL.
Optimization covers three operations:
* Compaction: Merges small files into larger ones
* Prune: Removes old versions of the dataset
* Index: Optimizes the indices, adding new data to existing indices
Parameters
----------
cleanup_older_than: timedelta, optional default 7 days
All files belonging to versions older than this will be removed. Set
to 0 days to remove all versions except the latest. The latest version
is never removed.
delete_unverified: bool, default False
Files leftover from a failed transaction may appear to be part of an
in-progress operation (e.g. appending new data) and these files will not
be deleted unless they are at least 7 days old. If delete_unverified is True
then these files will be deleted regardless of their age.
Experimental API
----------------
The optimization process is undergoing active development and may change.
Our goal with these changes is to improve the performance of optimization and
reduce the complexity.
That being said, it is essential today to run optimize if you want the best
performance. It should be stable and safe to use in production, but it our
hope that the API may be simplified (or not even need to be called) in the
future.
The frequency an application shoudl call optimize is based on the frequency of
data modifications. If data is frequently added, deleted, or updated then
optimize should be run frequently. A good rule of thumb is to run optimize if
you have added or modified 100,000 or more records or run more than 20 data
modification operations.
"""
try:
asyncio.get_running_loop()
raise AssertionError(
"Synchronous method called in asynchronous context. "
"If you are writing an asynchronous application "
"then please use the asynchronous APIs"
)
except RuntimeError:
asyncio.run(
self._async_optimize(
cleanup_older_than=cleanup_older_than,
delete_unverified=delete_unverified,
)
)
self.checkout_latest()
async def _async_optimize(
self,
cleanup_older_than: Optional[timedelta] = None,
delete_unverified: bool = False,
):
conn = await lancedb_connect(
sanitize_uri(self._conn.uri),
)
table = AsyncTable(await conn.open_table(self.name))
return await table.optimize(
cleanup_older_than=cleanup_older_than, delete_unverified=delete_unverified
)
def add_columns(self, transforms: Dict[str, str]):
self._dataset_mut.add_columns(transforms)

View File

@@ -1223,6 +1223,54 @@ async def test_time_travel(db_async: AsyncConnection):
await table.restore()
def test_sync_optimize(db):
table = LanceTable.create(
db,
"test",
data=[
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
],
)
table.create_scalar_index("price", index_type="BTREE")
stats = table.to_lance().stats.index_stats("price_idx")
assert stats["num_indexed_rows"] == 2
table.add([{"vector": [2.0, 2.0], "item": "baz", "price": 30.0}])
assert table.count_rows() == 3
table.optimize()
stats = table.to_lance().stats.index_stats("price_idx")
assert stats["num_indexed_rows"] == 3
@pytest.mark.asyncio
async def test_sync_optimize_in_async(db):
table = LanceTable.create(
db,
"test",
data=[
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
],
)
table.create_scalar_index("price", index_type="BTREE")
stats = table.to_lance().stats.index_stats("price_idx")
assert stats["num_indexed_rows"] == 2
table.add([{"vector": [2.0, 2.0], "item": "baz", "price": 30.0}])
assert table.count_rows() == 3
try:
table.optimize()
except Exception as e:
assert (
"Synchronous method called in asynchronous context. "
"If you are writing an asynchronous application "
"then please use the asynchronous APIs" in str(e)
)
@pytest.mark.asyncio
async def test_optimize(db_async: AsyncConnection):
table = await db_async.create_table(