mirror of
https://github.com/lancedb/lancedb.git
synced 2026-01-09 21:32:58 +00:00
prog 1
This commit is contained in:
@@ -18,6 +18,7 @@ from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Iterable, List, Optional, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from cachetools import TTLCache
|
||||
import pyarrow as pa
|
||||
from overrides import override
|
||||
|
||||
@@ -29,7 +30,6 @@ from ..table import Table, _sanitize_data
|
||||
from ..util import validate_table_name
|
||||
from .arrow import to_ipc_binary
|
||||
from .client import ARROW_STREAM_CONTENT_TYPE, RestfulLanceDBClient
|
||||
from .errors import LanceDBClientError
|
||||
|
||||
|
||||
class RemoteDBConnection(DBConnection):
|
||||
@@ -60,6 +60,7 @@ class RemoteDBConnection(DBConnection):
|
||||
read_timeout=read_timeout,
|
||||
)
|
||||
self._request_thread_pool = request_thread_pool
|
||||
self._table_cache = TTLCache(maxsize=10000, ttl=300)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"RemoteConnect(name={self.db_name})"
|
||||
@@ -89,6 +90,7 @@ class RemoteDBConnection(DBConnection):
|
||||
else:
|
||||
break
|
||||
for item in result:
|
||||
self._table_cache[item] = True
|
||||
yield item
|
||||
|
||||
@override
|
||||
@@ -109,16 +111,10 @@ class RemoteDBConnection(DBConnection):
|
||||
self._client.mount_retry_adapter_for_table(name)
|
||||
|
||||
# check if table exists
|
||||
try:
|
||||
if self._table_cache.get(name) is None:
|
||||
self._client.post(f"/v1/table/{name}/describe/")
|
||||
except LanceDBClientError as err:
|
||||
if str(err).startswith("Not found"):
|
||||
logging.error(
|
||||
"Table %s does not exist. Please first call "
|
||||
"db.create_table(%s, data).",
|
||||
name,
|
||||
name,
|
||||
)
|
||||
self._table_cache[name] = True
|
||||
|
||||
return RemoteTable(self, name)
|
||||
|
||||
@override
|
||||
@@ -267,6 +263,7 @@ class RemoteDBConnection(DBConnection):
|
||||
content_type=ARROW_STREAM_CONTENT_TYPE,
|
||||
)
|
||||
|
||||
self._table_cache[name] = True
|
||||
return RemoteTable(self, name)
|
||||
|
||||
@override
|
||||
@@ -282,6 +279,25 @@ class RemoteDBConnection(DBConnection):
|
||||
self._client.post(
|
||||
f"/v1/table/{name}/drop/",
|
||||
)
|
||||
self._table_cache.pop(name)
|
||||
|
||||
@override
|
||||
def rename_table(self, cur_name: str, new_name: str):
|
||||
"""Rename a table in the database.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
cur_name: str
|
||||
The current name of the table.
|
||||
new_name: str
|
||||
The new name of the table.
|
||||
"""
|
||||
self._client.post(
|
||||
f"/v1/table/{cur_name}/rename/",
|
||||
json={"new_table_name": new_name},
|
||||
)
|
||||
self._table_cache.pop(cur_name)
|
||||
self._table_cache[new_name] = True
|
||||
|
||||
async def close(self):
|
||||
"""Close the connection to the database."""
|
||||
|
||||
@@ -72,7 +72,7 @@ class RemoteTable(Table):
|
||||
return resp
|
||||
|
||||
def index_stats(self, index_uuid: str):
|
||||
"""List all the indices on the table"""
|
||||
"""List all the stats of a specificied index"""
|
||||
resp = self._conn._client.post(
|
||||
f"/v1/table/{self._name}/index/{index_uuid}/stats/"
|
||||
)
|
||||
|
||||
@@ -46,10 +46,18 @@ impl VectorIndex {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct VectorIndexMetadata {
|
||||
pub metric_type: String,
|
||||
pub index_type: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct VectorIndexStatistics {
|
||||
pub num_indexed_rows: usize,
|
||||
pub num_unindexed_rows: usize,
|
||||
pub index_type: String,
|
||||
pub indices: Vec<VectorIndexMetadata>,
|
||||
}
|
||||
|
||||
/// Builder for an IVF PQ index.
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! LanceDB Table APIs
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -757,6 +758,8 @@ pub struct NativeTable {
|
||||
// the object store wrapper to use on write path
|
||||
store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||
|
||||
storage_options: HashMap<String, String>,
|
||||
|
||||
// This comes from the connection options. We store here so we can pass down
|
||||
// to the dataset when we recreate it (for example, in checkout_latest).
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
@@ -822,6 +825,13 @@ impl NativeTable {
|
||||
None => params,
|
||||
};
|
||||
|
||||
let storage_options = params
|
||||
.store_options
|
||||
.clone()
|
||||
.unwrap_or_default()
|
||||
.storage_options
|
||||
.unwrap_or_default();
|
||||
|
||||
let dataset = DatasetBuilder::from_uri(uri)
|
||||
.with_read_params(params)
|
||||
.load()
|
||||
@@ -840,6 +850,7 @@ impl NativeTable {
|
||||
uri: uri.to_string(),
|
||||
dataset,
|
||||
store_wrapper: write_store_wrapper,
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
})
|
||||
}
|
||||
@@ -908,6 +919,13 @@ impl NativeTable {
|
||||
None => params,
|
||||
};
|
||||
|
||||
let storage_options = params
|
||||
.store_params
|
||||
.clone()
|
||||
.unwrap_or_default()
|
||||
.storage_options
|
||||
.unwrap_or_default();
|
||||
|
||||
let dataset = Dataset::write(batches, uri, Some(params))
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
@@ -921,6 +939,7 @@ impl NativeTable {
|
||||
uri: uri.to_string(),
|
||||
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
|
||||
store_wrapper: write_store_wrapper,
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
})
|
||||
}
|
||||
@@ -1042,6 +1061,26 @@ impl NativeTable {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_index_type(&self, index_uuid: &str) -> Result<Option<String>> {
|
||||
match self.load_index_stats(index_uuid).await? {
|
||||
Some(stats) => Ok(Some(stats.index_type)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_distance_type(&self, index_uuid: &str) -> Result<Option<String>> {
|
||||
match self.load_index_stats(index_uuid).await? {
|
||||
Some(stats) => Ok(Some(
|
||||
stats
|
||||
.indices
|
||||
.iter()
|
||||
.map(|i| i.metric_type.clone())
|
||||
.collect(),
|
||||
)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn load_indices(&self) -> Result<Vec<VectorIndex>> {
|
||||
let dataset = self.dataset.get().await?;
|
||||
let (indices, mf) = futures::try_join!(dataset.load_indices(), dataset.latest_manifest())?;
|
||||
@@ -1312,7 +1351,7 @@ impl TableInternal for NativeTable {
|
||||
add: AddDataBuilder<NoData>,
|
||||
data: Box<dyn RecordBatchReader + Send>,
|
||||
) -> Result<()> {
|
||||
let lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams {
|
||||
let mut lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams {
|
||||
mode: match add.mode {
|
||||
AddDataMode::Append => WriteMode::Append,
|
||||
AddDataMode::Overwrite => WriteMode::Overwrite,
|
||||
@@ -1320,6 +1359,18 @@ impl TableInternal for NativeTable {
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
// Bring storage options from table
|
||||
let storage_options = lance_params
|
||||
.store_params
|
||||
.get_or_insert(Default::default())
|
||||
.storage_options
|
||||
.get_or_insert(Default::default());
|
||||
for (key, value) in self.storage_options.iter() {
|
||||
if !storage_options.contains_key(key) {
|
||||
storage_options.insert(key.clone(), value.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// patch the params if we have a write store wrapper
|
||||
let lance_params = match self.store_wrapper.clone() {
|
||||
Some(wrapper) => lance_params.patch_with_store_wrapper(wrapper)?,
|
||||
|
||||
Reference in New Issue
Block a user