Compare commits

..

1 Commits

Author SHA1 Message Date
Lance Release
b7efc9c9c0 Bump version: 0.28.0-beta.9 → 0.28.0-beta.10 2026-04-28 13:29:17 +00:00
15 changed files with 22 additions and 552 deletions

10
Cargo.lock generated
View File

@@ -4476,11 +4476,9 @@ dependencies = [
"arrow-schema",
"async-trait",
"axum",
"base64 0.22.1",
"bytes",
"chrono",
"futures",
"hmac",
"lance",
"lance-core",
"lance-index",
@@ -4490,12 +4488,10 @@ dependencies = [
"lance-table",
"log",
"object_store",
"quick-xml 0.38.4",
"rand 0.9.4",
"reqwest",
"serde",
"serde_json",
"sha2",
"snafu 0.9.0",
"tokio",
"tower",
@@ -4580,7 +4576,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.28.0-beta.10"
version = "0.28.0-beta.9"
dependencies = [
"ahash",
"anyhow",
@@ -4662,7 +4658,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.28.0-beta.10"
version = "0.28.0-beta.9"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4684,7 +4680,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.31.0-beta.10"
version = "0.31.0-beta.9"
dependencies = [
"arrow",
"async-trait",

View File

@@ -41,29 +41,6 @@ for testing purposes.
***
### manifestEnabled?
```ts
optional manifestEnabled: boolean;
```
(For LanceDB OSS only): use directory namespace manifests as the source
of truth for table metadata. Existing directory-listed root tables are
migrated into the manifest on access.
***
### namespaceClientProperties?
```ts
optional namespaceClientProperties: Record<string, string>;
```
(For LanceDB OSS only): extra properties for the backing namespace
client used by manifest-enabled native connections.
***
### readConsistencyInterval?
```ts

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.28.0-beta.10",
"version": "0.28.0-beta.9",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.28.0-beta.10",
"version": "0.28.0-beta.9",
"cpu": [
"x64",
"arm64"

View File

@@ -67,12 +67,6 @@ impl Connection {
builder = builder.storage_option(key, value);
}
}
if let Some(manifest_enabled) = options.manifest_enabled {
builder = builder.manifest_enabled(manifest_enabled);
}
if let Some(namespace_client_properties) = options.namespace_client_properties {
builder = builder.namespace_client_properties(namespace_client_properties);
}
// Create client config, optionally with header provider
let client_config = options.client_config.unwrap_or_default();

View File

@@ -37,13 +37,6 @@ pub struct ConnectionOptions {
///
/// The available options are described at https://docs.lancedb.com/storage/
pub storage_options: Option<HashMap<String, String>>,
/// (For LanceDB OSS only): use directory namespace manifests as the source
/// of truth for table metadata. Existing directory-listed root tables are
/// migrated into the manifest on access.
pub manifest_enabled: Option<bool>,
/// (For LanceDB OSS only): extra properties for the backing namespace
/// client used by manifest-enabled native connections.
pub namespace_client_properties: Option<HashMap<String, String>>,
/// (For LanceDB OSS only): the session to use for this connection. Holds
/// shared caches and other session-specific state.
pub session: Option<session::Session>,

View File

@@ -73,7 +73,6 @@ def connect(
client_config: Union[ClientConfig, Dict[str, Any], None] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
manifest_enabled: bool = False,
namespace_client_impl: Optional[str] = None,
namespace_client_properties: Optional[Dict[str, str]] = None,
namespace_client_pushdown_operations: Optional[List[str]] = None,
@@ -112,10 +111,6 @@ def connect(
storage_options: dict, optional
Additional options for the storage backend. See available options at
<https://docs.lancedb.com/storage/>
manifest_enabled : bool, default False
When true for local/native connections, use directory namespace
manifests as the source of truth for table metadata. Existing
directory-listed root tables are migrated into the manifest on access.
session: Session, optional
(For LanceDB OSS only)
A session to use for this connection. Sessions allow you to configure
@@ -163,11 +158,11 @@ def connect(
conn : DBConnection
A connection to a LanceDB database.
"""
if namespace_client_impl is not None:
if namespace_client_properties is None:
if namespace_client_impl is not None or namespace_client_properties is not None:
if namespace_client_impl is None or namespace_client_properties is None:
raise ValueError(
"namespace_client_properties must be provided when "
"namespace_client_impl is set"
"Both namespace_client_impl and "
"namespace_client_properties must be provided"
)
if kwargs:
raise ValueError(f"Unknown keyword arguments: {kwargs}")
@@ -180,12 +175,6 @@ def connect(
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
)
if namespace_client_properties is not None and not manifest_enabled:
raise ValueError(
"namespace_client_impl must be provided when using "
"namespace_client_properties unless manifest_enabled=True"
)
if namespace_client_pushdown_operations is not None:
raise ValueError(
"namespace_client_pushdown_operations is only valid when "
@@ -223,8 +212,6 @@ def connect(
read_consistency_interval=read_consistency_interval,
storage_options=storage_options,
session=session,
manifest_enabled=manifest_enabled,
namespace_client_properties=namespace_client_properties,
)
@@ -302,8 +289,6 @@ def deserialize_conn(
parsed["uri"],
read_consistency_interval=rci,
storage_options=storage_options,
manifest_enabled=parsed.get("manifest_enabled", False),
namespace_client_properties=parsed.get("namespace_client_properties"),
)
else:
raise ValueError(f"Unknown connection_type: {connection_type}")
@@ -319,8 +304,6 @@ async def connect_async(
client_config: Optional[Union[ClientConfig, Dict[str, Any]]] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
manifest_enabled: bool = False,
namespace_client_properties: Optional[Dict[str, str]] = None,
) -> AsyncConnection:
"""Connect to a LanceDB database.
@@ -360,13 +343,6 @@ async def connect_async(
cache sizes for index and metadata caches, which can significantly
impact memory use and performance. They can also be re-used across
multiple connections to share the same cache state.
manifest_enabled : bool, default False
When true for local/native connections, use directory namespace
manifests as the source of truth for table metadata. Existing
directory-listed root tables are migrated into the manifest on access.
namespace_client_properties : dict, optional
Additional directory namespace client properties to use with
``manifest_enabled=True``.
Examples
--------
@@ -409,8 +385,6 @@ async def connect_async(
client_config,
storage_options,
session,
manifest_enabled,
namespace_client_properties,
)
)

View File

@@ -242,8 +242,6 @@ async def connect(
client_config: Optional[Union[ClientConfig, Dict[str, Any]]],
storage_options: Optional[Dict[str, str]],
session: Optional[Session],
manifest_enabled: bool = False,
namespace_client_properties: Optional[Dict[str, str]] = None,
) -> Connection: ...
class RecordBatchStream:

View File

@@ -590,13 +590,8 @@ class LanceDBConnection(DBConnection):
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
manifest_enabled: bool = False,
namespace_client_properties: Optional[Dict[str, str]] = None,
_inner: Optional[LanceDbConnection] = None,
):
self.storage_options = storage_options
self._manifest_enabled = manifest_enabled
self._namespace_client_properties = namespace_client_properties
if _inner is not None:
self._conn = _inner
self._cached_namespace_client = None
@@ -638,8 +633,6 @@ class LanceDBConnection(DBConnection):
None,
storage_options,
session,
manifest_enabled,
namespace_client_properties,
)
# TODO: It would be nice if we didn't store self.storage_options but it is
@@ -647,6 +640,7 @@ class LanceDBConnection(DBConnection):
# work because some paths like LanceDBConnection.from_inner will lose the
# storage_options. Also, this class really shouldn't be holding any state
# beyond _conn.
self.storage_options = storage_options
self._conn = AsyncConnection(LOOP.run(do_connect()))
self._cached_namespace_client: Optional[LanceNamespace] = None
@@ -683,8 +677,6 @@ class LanceDBConnection(DBConnection):
"connection_type": "local",
"uri": self.uri,
"storage_options": self.storage_options,
"manifest_enabled": self._manifest_enabled,
"namespace_client_properties": self._namespace_client_properties,
"read_consistency_interval_seconds": (
rci.total_seconds() if rci else None
),

View File

@@ -779,24 +779,6 @@ class Permutation:
batch = LOOP.run(do_getitems())
return self.transform_fn(batch)
def fetch(self, indices: list[int]) -> Any:
"""
Fetch rows from the permutation
returns the rows for the given permutation in the same shape as configured by
with_format / with_transform.
Examples
--------
>>> import lancedb
>>> db = lancedb.connect("memory:///")
>>> tbl = db.create_table("tbl", data=[{"x": x} for x in range(10)])
>>> perm = Permutation.identity(tbl)
>>> perm.fetch([0, 5, 9])
[{'x': 0}, {'x': 5}, {'x': 9}]
"""
return self.__getitems__(indices)
@deprecated(details="Use with_skip instead")
def skip(self, skip: int) -> "Permutation":
"""

View File

@@ -1095,8 +1095,3 @@ def test_getitems_invalid_offset(some_permutation: Permutation):
"""Test __getitems__ with an out-of-range offset raises an error."""
with pytest.raises(Exception):
some_permutation.__getitems__([999999])
def test_fetch_matches_getitems(some_permutation: Permutation):
indices = [0, 1, 2, 10, 100]
assert some_permutation.fetch(indices) == some_permutation.__getitems__(indices)

View File

@@ -525,7 +525,7 @@ impl Connection {
}
#[pyfunction]
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None))]
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None))]
#[allow(clippy::too_many_arguments)]
pub fn connect(
py: Python<'_>,
@@ -537,8 +537,6 @@ pub fn connect(
client_config: Option<PyClientConfig>,
storage_options: Option<HashMap<String, String>>,
session: Option<crate::session::Session>,
manifest_enabled: bool,
namespace_client_properties: Option<HashMap<String, String>>,
) -> PyResult<Bound<'_, PyAny>> {
future_into_py(py, async move {
let mut builder = lancedb::connect(&uri);
@@ -558,12 +556,6 @@ pub fn connect(
if let Some(storage_options) = storage_options {
builder = builder.storage_options(storage_options);
}
if manifest_enabled {
builder = builder.manifest_enabled(true);
}
if let Some(namespace_client_properties) = namespace_client_properties {
builder = builder.namespace_client_properties(namespace_client_properties);
}
#[cfg(feature = "remote")]
if let Some(client_config) = client_config {
builder = builder.client_config(client_config.into());

View File

@@ -111,12 +111,7 @@ default = []
aws = ["lance/aws", "lance-io/aws", "lance-namespace-impls/dir-aws"]
oss = ["lance/oss", "lance-io/oss", "lance-namespace-impls/dir-oss"]
gcs = ["lance/gcp", "lance-io/gcp", "lance-namespace-impls/dir-gcp"]
azure = [
"lance/azure",
"lance-io/azure",
"lance-namespace-impls/dir-azure",
"lance-namespace-impls/credential-vendor-azure",
]
azure = ["lance/azure", "lance-io/azure", "lance-namespace-impls/dir-azure"]
huggingface = [
"lance/huggingface",
"lance-io/huggingface",

View File

@@ -590,15 +590,6 @@ pub struct ConnectRequest {
/// storage options.
pub namespace_client_properties: HashMap<String, String>,
/// Use directory namespace manifests as the source of truth for native
/// LanceDB table metadata.
///
/// When enabled for a local/native connection, LanceDB returns a
/// namespace-backed database directly. Directory listing fallback remains
/// enabled for migration, and directory-listing-to-manifest migration is
/// forced on.
pub manifest_enabled: bool,
/// The interval at which to check for updates from other processes.
///
/// If None, then consistency is not checked. For performance
@@ -639,7 +630,6 @@ impl ConnectBuilder {
read_consistency_interval: None,
options: HashMap::new(),
namespace_client_properties: HashMap::new(),
manifest_enabled: false,
session: None,
},
embedding_registry: None,
@@ -801,17 +791,6 @@ impl ConnectBuilder {
self
}
/// Enable or disable manifest-backed directory namespace mode for local
/// native connections.
///
/// When enabled, the connection uses the directory namespace database
/// directly for all table operations and forces
/// `dir_listing_to_manifest_migration_enabled=true`.
pub fn manifest_enabled(mut self, enabled: bool) -> Self {
self.request.manifest_enabled = enabled;
self
}
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
@@ -907,16 +886,6 @@ impl ConnectBuilder {
pub async fn execute(self) -> Result<Connection> {
if self.request.uri.starts_with("db") {
self.execute_remote()
} else if self.request.manifest_enabled {
let internal = Arc::new(
ListingDatabase::connect_manifest_enabled_namespace_database(&self.request).await?,
);
Ok(Connection {
internal,
embedding_registry: self
.embedding_registry
.unwrap_or_else(|| Arc::new(MemoryRegistry::new())),
})
} else {
let internal = Arc::new(ListingDatabase::connect_with_options(&self.request).await?);
Ok(Connection {
@@ -1163,9 +1132,6 @@ mod tests {
use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
use tempfile::tempdir;
use crate::database::listing::{ListingDatabaseOptions, OPT_NEW_TABLE_V2_MANIFEST_PATHS};
use crate::database::namespace::LanceNamespaceDatabase;
use crate::table::NativeTable;
use crate::test_utils::connection::new_test_connection;
use super::*;
@@ -1238,147 +1204,6 @@ mod tests {
);
}
#[tokio::test]
async fn test_connect_with_manifest_enabled_uses_directory_namespace() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri)
.manifest_enabled(true)
.storage_option("timeout", "30s")
.namespace_client_property("manifest_enabled", "false")
.namespace_client_property("dir_listing_to_manifest_migration_enabled", "false")
.execute()
.await
.unwrap();
assert!(
db.database()
.as_any()
.downcast_ref::<LanceNamespaceDatabase>()
.is_some()
);
assert_eq!(db.uri(), uri);
let (ns_impl, properties) = db.namespace_client_config().await.unwrap();
assert_eq!(ns_impl, "dir");
assert_eq!(properties.get("root"), Some(&uri.to_string()));
assert_eq!(
properties.get("manifest_enabled"),
Some(&"true".to_string())
);
assert_eq!(
properties.get("dir_listing_to_manifest_migration_enabled"),
Some(&"true".to_string())
);
assert_eq!(properties.get("storage.timeout"), Some(&"30s".to_string()));
}
#[tokio::test]
async fn test_manifest_enabled_rejects_commit_engine_uri() {
let Err(err) = connect("s3+ddb://bucket/db?ddbTableName=manifest")
.manifest_enabled(true)
.execute()
.await
else {
panic!("expected manifest-enabled s3+ddb connection to fail");
};
assert!(
matches!(err, Error::NotSupported { message } if message.contains("commit engine URI schemes"))
);
let Err(err) = connect("s3://bucket/db?engine=ddb&ddbTableName=manifest")
.manifest_enabled(true)
.execute()
.await
else {
panic!("expected manifest-enabled engine query connection to fail");
};
assert!(
matches!(err, Error::NotSupported { message } if message.contains("commit engine"))
);
}
#[tokio::test]
async fn test_manifest_enabled_connection_migrates_root_listing_table() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
connect(uri)
.execute()
.await
.unwrap()
.create_empty_table("legacy", schema)
.execute()
.await
.unwrap();
let db = connect(uri).manifest_enabled(true).execute().await.unwrap();
let tables = db.table_names().execute().await.unwrap();
assert_eq!(tables, vec!["legacy".to_string()]);
db.open_table("legacy").execute().await.unwrap();
}
#[tokio::test]
async fn test_manifest_enabled_preserves_new_table_options() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let options = ListingDatabaseOptions::builder()
.enable_v2_manifest_paths(true)
.build();
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
let table = connect(uri)
.manifest_enabled(true)
.database_options(&options)
.execute()
.await
.unwrap()
.create_empty_table("v1_manifest", schema)
.storage_option(OPT_NEW_TABLE_V2_MANIFEST_PATHS, "false")
.execute()
.await
.unwrap();
let native_table = table
.base_table()
.as_any()
.downcast_ref::<NativeTable>()
.unwrap();
assert!(!native_table.uses_v2_manifest_paths().await.unwrap());
}
#[tokio::test]
async fn test_manifest_enabled_vend_input_storage_options() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
let table = connect(uri)
.manifest_enabled(true)
.storage_option("test_storage_option", "test_value")
.namespace_client_property("vend_input_storage_options", "true")
.namespace_client_property(
"vend_input_storage_options_refresh_interval_millis",
"60000",
)
.execute()
.await
.unwrap()
.create_empty_table("vended", schema)
.execute()
.await
.unwrap();
let storage_options = table.latest_storage_options().await.unwrap().unwrap();
assert_eq!(
storage_options.get("test_storage_option"),
Some(&"test_value".to_string())
);
assert!(storage_options.contains_key("expires_at_millis"));
}
#[tokio::test]
async fn test_table_names() {
let tc = new_test_connection().await.unwrap();

View File

@@ -285,7 +285,7 @@ const MIRRORED_STORE: &str = "mirroredStore";
/// A connection to LanceDB
impl ListingDatabase {
pub(crate) fn build_namespace_client_properties(
fn build_namespace_client_properties(
uri: &str,
storage_options: &HashMap<String, String>,
namespace_client_properties: HashMap<String, String>,
@@ -298,24 +298,6 @@ impl ListingDatabase {
properties
}
pub(crate) fn build_manifest_enabled_namespace_client_properties(
uri: &str,
storage_options: &HashMap<String, String>,
namespace_client_properties: HashMap<String, String>,
) -> HashMap<String, String> {
let mut properties = Self::build_namespace_client_properties(
uri,
storage_options,
namespace_client_properties,
);
properties.insert("manifest_enabled".to_string(), "true".to_string());
properties.insert(
"dir_listing_to_manifest_migration_enabled".to_string(),
"true".to_string(),
);
properties
}
async fn connect_namespace_database(
uri: &str,
storage_options: HashMap<String, String>,
@@ -341,119 +323,6 @@ impl ListingDatabase {
))
}
async fn prepare_namespace_root(
uri: &str,
storage_options: &HashMap<String, String>,
session: Arc<lance::session::Session>,
) -> Result<String> {
match url::Url::parse(uri) {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
let (object_store, _) = ObjectStore::from_uri_and_params(
session.store_registry(),
uri,
&ObjectStoreParams::default(),
)
.await?;
if object_store.is_local() {
Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?;
}
Ok(uri.to_string())
}
Ok(mut url) => {
if url.scheme().contains('+') {
return Err(Error::NotSupported {
message: "commit engine URI schemes are not supported for manifest-enabled namespace connections".to_string(),
});
}
for (key, value) in url.query_pairs() {
if key == ENGINE {
return Err(Error::NotSupported {
message: format!(
"commit engine '{}' is not supported for manifest-enabled namespace connections",
value
),
});
} else if key == MIRRORED_STORE {
return Err(Error::NotSupported {
message: "mirrored store is not supported for manifest-enabled namespace connections"
.to_string(),
});
}
}
url.set_query(None);
let plain_uri = url.to_string();
let os_params = ObjectStoreParams {
storage_options_accessor: if storage_options.is_empty() {
None
} else {
Some(Arc::new(StorageOptionsAccessor::with_static_options(
storage_options.clone(),
)))
},
..Default::default()
};
let (object_store, _) = ObjectStore::from_uri_and_params(
session.store_registry(),
&plain_uri,
&os_params,
)
.await?;
if object_store.is_local() {
Self::try_create_dir(&plain_uri).context(CreateDirSnafu {
path: plain_uri.clone(),
})?;
}
Ok(plain_uri)
}
Err(_) => {
let (object_store, _) = ObjectStore::from_uri_and_params(
session.store_registry(),
uri,
&ObjectStoreParams::default(),
)
.await?;
if object_store.is_local() {
Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?;
}
Ok(uri.to_string())
}
}
}
pub(crate) async fn connect_manifest_enabled_namespace_database(
request: &ConnectRequest,
) -> Result<LanceNamespaceDatabase> {
let options = ListingDatabaseOptions::parse_from_map(&request.options)?;
let session = request
.session
.clone()
.unwrap_or_else(|| Arc::new(lance::session::Session::default()));
let namespace_root =
Self::prepare_namespace_root(&request.uri, &options.storage_options, session.clone())
.await?;
let ns_properties = Self::build_manifest_enabled_namespace_client_properties(
&namespace_root,
&options.storage_options,
request.namespace_client_properties.clone(),
);
LanceNamespaceDatabase::connect_with_new_table_config(
"dir",
ns_properties,
options.storage_options,
request.read_consistency_interval,
Some(session),
HashSet::new(),
options.new_table_config,
)
.await
.map(|db| db.with_uri(request.uri.clone()))
}
/// Connect to a listing database
///
/// The URI should be a path to a directory where the tables are stored.
@@ -821,12 +690,15 @@ impl ListingDatabase {
store_params.storage_options_accessor = Some(Arc::new(accessor));
}
write_params.data_storage_version = storage_version_override
.or(write_params.data_storage_version)
.or(self.new_table_config.data_storage_version);
write_params.data_storage_version = self
.new_table_config
.data_storage_version
.or(storage_version_override);
if let Some(enable_v2_manifest_paths) =
v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths)
if let Some(enable_v2_manifest_paths) = self
.new_table_config
.enable_v2_manifest_paths
.or(v2_manifest_override)
{
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths;
}
@@ -1286,7 +1158,6 @@ mod tests {
client_config: Default::default(),
options: Default::default(),
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -1421,7 +1292,6 @@ mod tests {
client_config: Default::default(),
options: options.clone(),
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -1957,7 +1827,6 @@ mod tests {
client_config: Default::default(),
options,
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -2064,7 +1933,6 @@ mod tests {
client_config: Default::default(),
options,
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -2137,7 +2005,6 @@ mod tests {
client_config: Default::default(),
options,
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -2335,7 +2202,6 @@ mod tests {
client_config: Default::default(),
options: Default::default(),
namespace_client_properties,
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};

View File

@@ -24,10 +24,6 @@ use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use crate::connection::NamespaceClientPushdownOperation;
use crate::database::ReadConsistency;
use crate::database::listing::{
NewTableConfig, OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, OPT_NEW_TABLE_STORAGE_VERSION,
OPT_NEW_TABLE_V2_MANIFEST_PATHS,
};
use crate::error::{Error, Result};
use crate::table::NativeTable;
use lance::dataset::WriteMode;
@@ -54,8 +50,6 @@ pub struct LanceNamespaceDatabase {
ns_impl: String,
// Namespace properties used to construct the namespace client
ns_properties: HashMap<String, String>,
// Options for tables created by this connection
new_table_config: NewTableConfig,
}
impl LanceNamespaceDatabase {
@@ -77,15 +71,9 @@ impl LanceNamespaceDatabase {
pushdown_operations: namespace_client_pushdown_operations,
ns_impl: namespace_client_impl,
ns_properties: namespace_client_properties,
new_table_config: NewTableConfig::default(),
}
}
pub(crate) fn with_uri(mut self, uri: impl Into<String>) -> Self {
self.uri = uri.into();
self
}
pub async fn connect(
ns_impl: &str,
ns_properties: HashMap<String, String>,
@@ -93,27 +81,6 @@ impl LanceNamespaceDatabase {
read_consistency_interval: Option<std::time::Duration>,
session: Option<Arc<lance::session::Session>>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
) -> Result<Self> {
Self::connect_with_new_table_config(
ns_impl,
ns_properties,
storage_options,
read_consistency_interval,
session,
pushdown_operations,
NewTableConfig::default(),
)
.await
}
pub(crate) async fn connect_with_new_table_config(
ns_impl: &str,
ns_properties: HashMap<String, String>,
storage_options: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
session: Option<Arc<lance::session::Session>>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
new_table_config: NewTableConfig,
) -> Result<Self> {
let mut builder = ConnectBuilder::new(ns_impl);
for (key, value) in ns_properties.clone() {
@@ -135,79 +102,8 @@ impl LanceNamespaceDatabase {
pushdown_operations,
ns_impl: ns_impl.to_string(),
ns_properties,
new_table_config,
})
}
fn extract_storage_overrides(
&self,
request: &DbCreateTableRequest,
) -> Result<(
Option<lance_encoding::version::LanceFileVersion>,
Option<bool>,
Option<bool>,
)> {
let storage_options = request
.write_options
.lance_write_params
.as_ref()
.and_then(|p| p.store_params.as_ref())
.and_then(|sp| sp.storage_options());
let storage_version_override = storage_options
.and_then(|opts| opts.get(OPT_NEW_TABLE_STORAGE_VERSION))
.map(|s| s.parse::<lance_encoding::version::LanceFileVersion>())
.transpose()?;
let v2_manifest_override = storage_options
.and_then(|opts| opts.get(OPT_NEW_TABLE_V2_MANIFEST_PATHS))
.map(|s| s.parse::<bool>())
.transpose()
.map_err(|_| Error::InvalidInput {
message: "enable_v2_manifest_paths must be a boolean".to_string(),
})?;
let stable_row_ids_override = storage_options
.and_then(|opts| opts.get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS))
.map(|s| s.parse::<bool>())
.transpose()
.map_err(|_| Error::InvalidInput {
message: "enable_stable_row_ids must be a boolean".to_string(),
})?;
Ok((
storage_version_override,
v2_manifest_override,
stable_row_ids_override,
))
}
fn apply_new_table_config(
&self,
params: &mut lance::dataset::WriteParams,
request: &DbCreateTableRequest,
) -> Result<()> {
let (storage_version_override, v2_manifest_override, stable_row_ids_override) =
self.extract_storage_overrides(request)?;
params.data_storage_version = storage_version_override
.or(params.data_storage_version)
.or(self.new_table_config.data_storage_version);
if let Some(enable_v2_manifest_paths) =
v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths)
{
params.enable_v2_manifest_paths = enable_v2_manifest_paths;
}
if let Some(enable_stable_row_ids) =
stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids)
{
params.enable_stable_row_ids = enable_stable_row_ids;
}
Ok(())
}
}
impl std::fmt::Debug for LanceNamespaceDatabase {
@@ -403,12 +299,7 @@ impl Database for LanceNamespaceDatabase {
};
// Build write params with storage options and commit handler
let mut params = request
.write_options
.lance_write_params
.clone()
.unwrap_or_default();
self.apply_new_table_config(&mut params, &request)?;
let mut params = request.write_options.lance_write_params.unwrap_or_default();
if matches!(request.mode, CreateTableMode::Overwrite) {
params.mode = WriteMode::Overwrite;