diff --git a/python/python/lancedb/remote/db.py b/python/python/lancedb/remote/db.py index 9dff65c5..3809c511 100644 --- a/python/python/lancedb/remote/db.py +++ b/python/python/lancedb/remote/db.py @@ -18,6 +18,7 @@ from concurrent.futures import ThreadPoolExecutor from typing import Iterable, List, Optional, Union from urllib.parse import urlparse +from cachetools import TTLCache import pyarrow as pa from overrides import override @@ -29,7 +30,6 @@ from ..table import Table, _sanitize_data from ..util import validate_table_name from .arrow import to_ipc_binary from .client import ARROW_STREAM_CONTENT_TYPE, RestfulLanceDBClient -from .errors import LanceDBClientError class RemoteDBConnection(DBConnection): @@ -60,6 +60,7 @@ class RemoteDBConnection(DBConnection): read_timeout=read_timeout, ) self._request_thread_pool = request_thread_pool + self._table_cache = TTLCache(maxsize=10000, ttl=300) def __repr__(self) -> str: return f"RemoteConnect(name={self.db_name})" @@ -89,6 +90,7 @@ class RemoteDBConnection(DBConnection): else: break for item in result: + self._table_cache[item] = True yield item @override @@ -109,16 +111,10 @@ class RemoteDBConnection(DBConnection): self._client.mount_retry_adapter_for_table(name) # check if table exists - try: + if self._table_cache.get(name) is None: self._client.post(f"/v1/table/{name}/describe/") - except LanceDBClientError as err: - if str(err).startswith("Not found"): - logging.error( - "Table %s does not exist. Please first call " - "db.create_table(%s, data).", - name, - name, - ) + self._table_cache[name] = True + return RemoteTable(self, name) @override @@ -267,6 +263,7 @@ class RemoteDBConnection(DBConnection): content_type=ARROW_STREAM_CONTENT_TYPE, ) + self._table_cache[name] = True return RemoteTable(self, name) @override @@ -282,6 +279,25 @@ class RemoteDBConnection(DBConnection): self._client.post( f"/v1/table/{name}/drop/", ) + self._table_cache.pop(name) + + @override + def rename_table(self, cur_name: str, new_name: str): + """Rename a table in the database. + + Parameters + ---------- + cur_name: str + The current name of the table. + new_name: str + The new name of the table. + """ + self._client.post( + f"/v1/table/{cur_name}/rename/", + json={"new_table_name": new_name}, + ) + self._table_cache.pop(cur_name) + self._table_cache[new_name] = True async def close(self): """Close the connection to the database.""" diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index d17410f8..a82c52ed 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -72,7 +72,7 @@ class RemoteTable(Table): return resp def index_stats(self, index_uuid: str): - """List all the indices on the table""" + """List all the stats of a specificied index""" resp = self._conn._client.post( f"/v1/table/{self._name}/index/{index_uuid}/stats/" ) diff --git a/rust/lancedb/src/index/vector.rs b/rust/lancedb/src/index/vector.rs index 11f25fee..c2637378 100644 --- a/rust/lancedb/src/index/vector.rs +++ b/rust/lancedb/src/index/vector.rs @@ -46,10 +46,18 @@ impl VectorIndex { } } +#[derive(Debug, Deserialize)] +pub struct VectorIndexMetadata { + pub metric_type: String, + pub index_type: String, +} + #[derive(Debug, Deserialize)] pub struct VectorIndexStatistics { pub num_indexed_rows: usize, pub num_unindexed_rows: usize, + pub index_type: String, + pub indices: Vec, } /// Builder for an IVF PQ index. diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 7331e29c..8efbd115 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -14,6 +14,7 @@ //! LanceDB Table APIs +use std::collections::HashMap; use std::path::Path; use std::sync::Arc; @@ -757,6 +758,8 @@ pub struct NativeTable { // the object store wrapper to use on write path store_wrapper: Option>, + storage_options: HashMap, + // This comes from the connection options. We store here so we can pass down // to the dataset when we recreate it (for example, in checkout_latest). read_consistency_interval: Option, @@ -822,6 +825,13 @@ impl NativeTable { None => params, }; + let storage_options = params + .store_options + .clone() + .unwrap_or_default() + .storage_options + .unwrap_or_default(); + let dataset = DatasetBuilder::from_uri(uri) .with_read_params(params) .load() @@ -840,6 +850,7 @@ impl NativeTable { uri: uri.to_string(), dataset, store_wrapper: write_store_wrapper, + storage_options, read_consistency_interval, }) } @@ -908,6 +919,13 @@ impl NativeTable { None => params, }; + let storage_options = params + .store_params + .clone() + .unwrap_or_default() + .storage_options + .unwrap_or_default(); + let dataset = Dataset::write(batches, uri, Some(params)) .await .map_err(|e| match e { @@ -921,6 +939,7 @@ impl NativeTable { uri: uri.to_string(), dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval), store_wrapper: write_store_wrapper, + storage_options, read_consistency_interval, }) } @@ -1042,6 +1061,26 @@ impl NativeTable { } } + pub async fn get_index_type(&self, index_uuid: &str) -> Result> { + match self.load_index_stats(index_uuid).await? { + Some(stats) => Ok(Some(stats.index_type)), + None => Ok(None), + } + } + + pub async fn get_distance_type(&self, index_uuid: &str) -> Result> { + match self.load_index_stats(index_uuid).await? { + Some(stats) => Ok(Some( + stats + .indices + .iter() + .map(|i| i.metric_type.clone()) + .collect(), + )), + None => Ok(None), + } + } + pub async fn load_indices(&self) -> Result> { let dataset = self.dataset.get().await?; let (indices, mf) = futures::try_join!(dataset.load_indices(), dataset.latest_manifest())?; @@ -1312,7 +1351,7 @@ impl TableInternal for NativeTable { add: AddDataBuilder, data: Box, ) -> Result<()> { - let lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams { + let mut lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams { mode: match add.mode { AddDataMode::Append => WriteMode::Append, AddDataMode::Overwrite => WriteMode::Overwrite, @@ -1320,6 +1359,18 @@ impl TableInternal for NativeTable { ..Default::default() }); + // Bring storage options from table + let storage_options = lance_params + .store_params + .get_or_insert(Default::default()) + .storage_options + .get_or_insert(Default::default()); + for (key, value) in self.storage_options.iter() { + if !storage_options.contains_key(key) { + storage_options.insert(key.clone(), value.clone()); + } + } + // patch the params if we have a write store wrapper let lance_params = match self.store_wrapper.clone() { Some(wrapper) => lance_params.patch_with_store_wrapper(wrapper)?,