diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index e2d88b98..a068af12 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import timedelta import asyncio import logging from functools import cached_property @@ -478,6 +479,19 @@ class RemoteTable(Table): "compact_files() is not supported on the LanceDB cloud" ) + def optimize( + self, + *, + cleanup_older_than: Optional[timedelta] = None, + delete_unverified: bool = False, + ): + """optimize() is not supported on the LanceDB cloud. + Indices are optimized automatically.""" + raise NotImplementedError( + "optimize() is not supported on the LanceDB cloud. " + "Indices are optimized automatically." + ) + def count_rows(self, filter: Optional[str] = None) -> int: return self._loop.run_until_complete(self._table.count_rows(filter)) diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 18e2c266..6403c88f 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -3,6 +3,7 @@ from __future__ import annotations +import asyncio import inspect import time from abc import ABC, abstractmethod @@ -32,7 +33,7 @@ import pyarrow.fs as pa_fs from lance import LanceDataset from lance.dependencies import _check_for_hugging_face -from .common import DATA, VEC, VECTOR_COLUMN_NAME +from .common import DATA, VEC, VECTOR_COLUMN_NAME, sanitize_uri from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry from .merge import LanceMergeInsertBuilder from .pydantic import LanceModel, model_to_dict @@ -57,6 +58,8 @@ from .util import ( ) from .index import lang_mapping +from ._lancedb import connect as lancedb_connect + if TYPE_CHECKING: import PIL from lance.dataset import CleanupStats, ReaderLike @@ -893,6 +896,55 @@ class Table(ABC): For most cases, the default should be fine. """ + @abstractmethod + def optimize( + self, + *, + cleanup_older_than: Optional[timedelta] = None, + delete_unverified: bool = False, + ): + """ + Optimize the on-disk data and indices for better performance. + + Modeled after ``VACUUM`` in PostgreSQL. + + Optimization covers three operations: + + * Compaction: Merges small files into larger ones + * Prune: Removes old versions of the dataset + * Index: Optimizes the indices, adding new data to existing indices + + Parameters + ---------- + cleanup_older_than: timedelta, optional default 7 days + All files belonging to versions older than this will be removed. Set + to 0 days to remove all versions except the latest. The latest version + is never removed. + delete_unverified: bool, default False + Files leftover from a failed transaction may appear to be part of an + in-progress operation (e.g. appending new data) and these files will not + be deleted unless they are at least 7 days old. If delete_unverified is True + then these files will be deleted regardless of their age. + + Experimental API + ---------------- + + The optimization process is undergoing active development and may change. + Our goal with these changes is to improve the performance of optimization and + reduce the complexity. + + That being said, it is essential today to run optimize if you want the best + performance. It should be stable and safe to use in production, but it our + hope that the API may be simplified (or not even need to be called) in the + future. + + The frequency an application shoudl call optimize is based on the frequency of + data modifications. If data is frequently added, deleted, or updated then + optimize should be run frequently. A good rule of thumb is to run optimize if + you have added or modified 100,000 or more records or run more than 20 data + modification operations. + """ + @abstractmethod def add_columns(self, transforms: Dict[str, str]): """ @@ -1971,6 +2023,83 @@ class LanceTable(Table): """ return self.to_lance().optimize.compact_files(*args, **kwargs) + def optimize( + self, + *, + cleanup_older_than: Optional[timedelta] = None, + delete_unverified: bool = False, + ): + """ + Optimize the on-disk data and indices for better performance. + + Modeled after ``VACUUM`` in PostgreSQL. + + Optimization covers three operations: + + * Compaction: Merges small files into larger ones + * Prune: Removes old versions of the dataset + * Index: Optimizes the indices, adding new data to existing indices + + Parameters + ---------- + cleanup_older_than: timedelta, optional default 7 days + All files belonging to versions older than this will be removed. Set + to 0 days to remove all versions except the latest. The latest version + is never removed. + delete_unverified: bool, default False + Files leftover from a failed transaction may appear to be part of an + in-progress operation (e.g. appending new data) and these files will not + be deleted unless they are at least 7 days old. If delete_unverified is True + then these files will be deleted regardless of their age. + + Experimental API + ---------------- + + The optimization process is undergoing active development and may change. + Our goal with these changes is to improve the performance of optimization and + reduce the complexity. + + That being said, it is essential today to run optimize if you want the best + performance. It should be stable and safe to use in production, but it our + hope that the API may be simplified (or not even need to be called) in the + future. + + The frequency an application shoudl call optimize is based on the frequency of + data modifications. If data is frequently added, deleted, or updated then + optimize should be run frequently. A good rule of thumb is to run optimize if + you have added or modified 100,000 or more records or run more than 20 data + modification operations. + """ + try: + asyncio.get_running_loop() + raise AssertionError( + "Synchronous method called in asynchronous context. " + "If you are writing an asynchronous application " + "then please use the asynchronous APIs" + ) + + except RuntimeError: + asyncio.run( + self._async_optimize( + cleanup_older_than=cleanup_older_than, + delete_unverified=delete_unverified, + ) + ) + self.checkout_latest() + + async def _async_optimize( + self, + cleanup_older_than: Optional[timedelta] = None, + delete_unverified: bool = False, + ): + conn = await lancedb_connect( + sanitize_uri(self._conn.uri), + ) + table = AsyncTable(await conn.open_table(self.name)) + return await table.optimize( + cleanup_older_than=cleanup_older_than, delete_unverified=delete_unverified + ) + def add_columns(self, transforms: Dict[str, str]): self._dataset_mut.add_columns(transforms) diff --git a/python/python/tests/test_table.py b/python/python/tests/test_table.py index bdf22ddf..7ed367cb 100644 --- a/python/python/tests/test_table.py +++ b/python/python/tests/test_table.py @@ -1223,6 +1223,54 @@ async def test_time_travel(db_async: AsyncConnection): await table.restore() +def test_sync_optimize(db): + table = LanceTable.create( + db, + "test", + data=[ + {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}, + {"vector": [5.9, 26.5], "item": "bar", "price": 20.0}, + ], + ) + + table.create_scalar_index("price", index_type="BTREE") + stats = table.to_lance().stats.index_stats("price_idx") + assert stats["num_indexed_rows"] == 2 + + table.add([{"vector": [2.0, 2.0], "item": "baz", "price": 30.0}]) + assert table.count_rows() == 3 + table.optimize() + stats = table.to_lance().stats.index_stats("price_idx") + assert stats["num_indexed_rows"] == 3 + + +@pytest.mark.asyncio +async def test_sync_optimize_in_async(db): + table = LanceTable.create( + db, + "test", + data=[ + {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}, + {"vector": [5.9, 26.5], "item": "bar", "price": 20.0}, + ], + ) + + table.create_scalar_index("price", index_type="BTREE") + stats = table.to_lance().stats.index_stats("price_idx") + assert stats["num_indexed_rows"] == 2 + + table.add([{"vector": [2.0, 2.0], "item": "baz", "price": 30.0}]) + assert table.count_rows() == 3 + try: + table.optimize() + except Exception as e: + assert ( + "Synchronous method called in asynchronous context. " + "If you are writing an asynchronous application " + "then please use the asynchronous APIs" in str(e) + ) + + @pytest.mark.asyncio async def test_optimize(db_async: AsyncConnection): table = await db_async.create_table(