From 44878dd9a5d4623b43f87176c75b26a4a9ce31dc Mon Sep 17 00:00:00 2001 From: Jonathan Hsieh Date: Tue, 2 Dec 2025 13:57:00 -0800 Subject: [PATCH] feat: support stable row IDs via storage_options (#2831) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for enabling stable row IDs when creating tables via the `new_table_enable_stable_row_ids` storage option. Stable row IDs ensure that row identifiers remain constant after compaction, update, delete, and merge operations. This is useful for materialized views and other use cases that need to track source rows across these operations. The option can be set at two levels: - Connection level: applies to all tables created with that connection - Table level: per-table override via create_table storage_options 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude --- python/python/lancedb/db.py | 8 + python/python/tests/test_db.py | 144 ++++++++++++ rust/lancedb/src/database/listing.rs | 321 ++++++++++++++++++++++++++- 3 files changed, 467 insertions(+), 6 deletions(-) diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index b58ab852..38a7a334 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -194,6 +194,10 @@ class DBConnection(EnforceOverrides): connection will be inherited by the table, but can be overridden here. See available options at + + To enable stable row IDs (row IDs remain stable after compaction, + update, delete, and merges), set `new_table_enable_stable_row_ids` + to `"true"` in storage_options when connecting to the database. data_storage_version: optional, str, default "stable" Deprecated. Set `storage_options` when connecting to the database and set `new_table_data_storage_version` in the options. @@ -1079,6 +1083,10 @@ class AsyncConnection(object): See available options at + To enable stable row IDs (row IDs remain stable after compaction, + update, delete, and merges), set `new_table_enable_stable_row_ids` + to `"true"` in storage_options when connecting to the database. + Returns ------- AsyncTable diff --git a/python/python/tests/test_db.py b/python/python/tests/test_db.py index f3d002cf..3aadba0b 100644 --- a/python/python/tests/test_db.py +++ b/python/python/tests/test_db.py @@ -441,6 +441,150 @@ async def test_create_table_v2_manifest_paths_async(tmp_path): assert re.match(r"\d{20}\.manifest", manifest) +@pytest.mark.asyncio +async def test_create_table_stable_row_ids_via_storage_options(tmp_path): + """Test stable_row_ids via storage_options at connect time.""" + import lance + + # Connect with stable row IDs enabled as default for new tables + db_with = await lancedb.connect_async( + tmp_path, storage_options={"new_table_enable_stable_row_ids": "true"} + ) + # Connect without stable row IDs (default) + db_without = await lancedb.connect_async( + tmp_path, storage_options={"new_table_enable_stable_row_ids": "false"} + ) + + # Create table using connection with stable row IDs enabled + await db_with.create_table( + "with_stable_via_opts", + data=[{"id": i} for i in range(10)], + ) + lance_ds_with = lance.dataset(tmp_path / "with_stable_via_opts.lance") + fragments_with = lance_ds_with.get_fragments() + assert len(fragments_with) > 0 + assert fragments_with[0].metadata.row_id_meta is not None + + # Create table using connection without stable row IDs + await db_without.create_table( + "without_stable_via_opts", + data=[{"id": i} for i in range(10)], + ) + lance_ds_without = lance.dataset(tmp_path / "without_stable_via_opts.lance") + fragments_without = lance_ds_without.get_fragments() + assert len(fragments_without) > 0 + assert fragments_without[0].metadata.row_id_meta is None + + +def test_create_table_stable_row_ids_via_storage_options_sync(tmp_path): + """Test that enable_stable_row_ids can be set via storage_options (sync API).""" + # Connect with stable row IDs enabled as default for new tables + db_with = lancedb.connect( + tmp_path, storage_options={"new_table_enable_stable_row_ids": "true"} + ) + # Connect without stable row IDs (default) + db_without = lancedb.connect( + tmp_path, storage_options={"new_table_enable_stable_row_ids": "false"} + ) + + # Create table using connection with stable row IDs enabled + tbl_with = db_with.create_table( + "with_stable_sync", + data=[{"id": i} for i in range(10)], + ) + lance_ds_with = tbl_with.to_lance() + fragments_with = lance_ds_with.get_fragments() + assert len(fragments_with) > 0 + assert fragments_with[0].metadata.row_id_meta is not None + + # Create table using connection without stable row IDs + tbl_without = db_without.create_table( + "without_stable_sync", + data=[{"id": i} for i in range(10)], + ) + lance_ds_without = tbl_without.to_lance() + fragments_without = lance_ds_without.get_fragments() + assert len(fragments_without) > 0 + assert fragments_without[0].metadata.row_id_meta is None + + +@pytest.mark.asyncio +async def test_create_table_stable_row_ids_table_level_override(tmp_path): + """Test that stable_row_ids can be enabled/disabled at create_table level.""" + import lance + + # Connect without any stable row ID setting + db_default = await lancedb.connect_async(tmp_path) + + # Connect with stable row IDs enabled at connection level + db_with_stable = await lancedb.connect_async( + tmp_path, storage_options={"new_table_enable_stable_row_ids": "true"} + ) + + # Case 1: No connection setting, enable at table level + await db_default.create_table( + "table_level_enabled", + data=[{"id": i} for i in range(10)], + storage_options={"new_table_enable_stable_row_ids": "true"}, + ) + lance_ds = lance.dataset(tmp_path / "table_level_enabled.lance") + fragments = lance_ds.get_fragments() + assert len(fragments) > 0 + assert fragments[0].metadata.row_id_meta is not None, ( + "Table should have stable row IDs when enabled at table level" + ) + + # Case 2: Connection has stable row IDs, override with false at table level + await db_with_stable.create_table( + "table_level_disabled", + data=[{"id": i} for i in range(10)], + storage_options={"new_table_enable_stable_row_ids": "false"}, + ) + lance_ds = lance.dataset(tmp_path / "table_level_disabled.lance") + fragments = lance_ds.get_fragments() + assert len(fragments) > 0 + assert fragments[0].metadata.row_id_meta is None, ( + "Table should NOT have stable row IDs when disabled at table level" + ) + + +def test_create_table_stable_row_ids_table_level_override_sync(tmp_path): + """Test that stable_row_ids can be enabled/disabled at create_table level (sync).""" + # Connect without any stable row ID setting + db_default = lancedb.connect(tmp_path) + + # Connect with stable row IDs enabled at connection level + db_with_stable = lancedb.connect( + tmp_path, storage_options={"new_table_enable_stable_row_ids": "true"} + ) + + # Case 1: No connection setting, enable at table level + tbl = db_default.create_table( + "table_level_enabled_sync", + data=[{"id": i} for i in range(10)], + storage_options={"new_table_enable_stable_row_ids": "true"}, + ) + lance_ds = tbl.to_lance() + fragments = lance_ds.get_fragments() + assert len(fragments) > 0 + assert fragments[0].metadata.row_id_meta is not None, ( + "Table should have stable row IDs when enabled at table level" + ) + + # Case 2: Connection has stable row IDs, override with false at table level + tbl = db_with_stable.create_table( + "table_level_disabled_sync", + data=[{"id": i} for i in range(10)], + storage_options={"new_table_enable_stable_row_ids": "false"}, + ) + lance_ds = tbl.to_lance() + fragments = lance_ds.get_fragments() + assert len(fragments) > 0 + assert fragments[0].metadata.row_id_meta is None, ( + "Table should NOT have stable row IDs when disabled at table level" + ) + + def test_open_table_sync(tmp_db: lancedb.DBConnection): tmp_db.create_table("test", data=[{"id": 0}]) assert tmp_db.open_table("test").count_rows() == 1 diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index 18fc991f..a740cd30 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -35,6 +35,7 @@ pub const LANCE_FILE_EXTENSION: &str = "lance"; pub const OPT_NEW_TABLE_STORAGE_VERSION: &str = "new_table_data_storage_version"; pub const OPT_NEW_TABLE_V2_MANIFEST_PATHS: &str = "new_table_enable_v2_manifest_paths"; +pub const OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS: &str = "new_table_enable_stable_row_ids"; /// Controls how new tables should be created #[derive(Clone, Debug, Default)] @@ -48,6 +49,12 @@ pub struct NewTableConfig { /// V2 manifest paths are more efficient than V2 manifest paths but are not /// supported by old clients. pub enable_v2_manifest_paths: Option, + /// Whether to enable stable row IDs for new tables + /// + /// When enabled, row IDs remain stable after compaction, update, delete, + /// and merges. This is useful for materialized views and other use cases + /// that need to track source rows across these operations. + pub enable_stable_row_ids: Option, } /// Options specific to the listing database @@ -87,6 +94,14 @@ impl ListingDatabaseOptions { }) }) .transpose()?, + enable_stable_row_ids: map + .get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS) + .map(|s| { + s.parse::().map_err(|_| Error::InvalidInput { + message: format!("enable_stable_row_ids must be a boolean, received {}", s), + }) + }) + .transpose()?, }; // We just assume that any options that are not new table config options are storage options let storage_options = map @@ -94,6 +109,7 @@ impl ListingDatabaseOptions { .filter(|(key, _)| { key.as_str() != OPT_NEW_TABLE_STORAGE_VERSION && key.as_str() != OPT_NEW_TABLE_V2_MANIFEST_PATHS + && key.as_str() != OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS }) .map(|(key, value)| (key.clone(), value.clone())) .collect(); @@ -118,6 +134,12 @@ impl DatabaseOptions for ListingDatabaseOptions { enable_v2_manifest_paths.to_string(), ); } + if let Some(enable_stable_row_ids) = self.new_table_config.enable_stable_row_ids { + map.insert( + OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(), + enable_stable_row_ids.to_string(), + ); + } } } @@ -497,7 +519,7 @@ impl ListingDatabase { fn extract_storage_overrides( &self, request: &CreateTableRequest, - ) -> Result<(Option, Option)> { + ) -> Result<(Option, Option, Option)> { let storage_options = request .write_options .lance_write_params @@ -518,7 +540,19 @@ impl ListingDatabase { message: "enable_v2_manifest_paths must be a boolean".to_string(), })?; - Ok((storage_version_override, v2_manifest_override)) + let stable_row_ids_override = storage_options + .and_then(|opts| opts.get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS)) + .map(|s| s.parse::()) + .transpose() + .map_err(|_| Error::InvalidInput { + message: "enable_stable_row_ids must be a boolean".to_string(), + })?; + + Ok(( + storage_version_override, + v2_manifest_override, + stable_row_ids_override, + )) } /// Prepare write parameters for table creation @@ -527,6 +561,7 @@ impl ListingDatabase { request: &CreateTableRequest, storage_version_override: Option, v2_manifest_override: Option, + stable_row_ids_override: Option, ) -> lance::dataset::WriteParams { let mut write_params = request .write_options @@ -571,6 +606,13 @@ impl ListingDatabase { write_params.enable_v2_manifest_paths = enable_v2_manifest_paths; } + // Apply enable_stable_row_ids: table-level override takes precedence over connection config + if let Some(enable_stable_row_ids) = + stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids) + { + write_params.enable_stable_row_ids = enable_stable_row_ids; + } + if matches!(&request.mode, CreateTableMode::Overwrite) { write_params.mode = WriteMode::Overwrite; } @@ -706,11 +748,15 @@ impl Database for ListingDatabase { .clone() .unwrap_or_else(|| self.table_uri(&request.name).unwrap()); - let (storage_version_override, v2_manifest_override) = + let (storage_version_override, v2_manifest_override, stable_row_ids_override) = self.extract_storage_overrides(&request)?; - let write_params = - self.prepare_write_params(&request, storage_version_override, v2_manifest_override); + let write_params = self.prepare_write_params( + &request, + storage_version_override, + v2_manifest_override, + stable_row_ids_override, + ); let data_schema = request.data.arrow_schema(); @@ -921,7 +967,7 @@ impl Database for ListingDatabase { mod tests { use super::*; use crate::connection::ConnectRequest; - use crate::database::{CreateTableData, CreateTableMode, CreateTableRequest}; + use crate::database::{CreateTableData, CreateTableMode, CreateTableRequest, WriteOptions}; use crate::table::{Table, TableDefinition}; use arrow_array::{Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; @@ -1621,4 +1667,267 @@ mod tests { // Cloned table should have all 8 rows from the latest version assert_eq!(cloned_table.count_rows(None).await.unwrap(), 8); } + + #[tokio::test] + async fn test_create_table_with_stable_row_ids_connection_level() { + let tempdir = tempdir().unwrap(); + let uri = tempdir.path().to_str().unwrap(); + + // Create database with stable row IDs enabled at connection level + let mut options = HashMap::new(); + options.insert( + OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(), + "true".to_string(), + ); + + let request = ConnectRequest { + uri: uri.to_string(), + #[cfg(feature = "remote")] + client_config: Default::default(), + options, + read_consistency_interval: None, + session: None, + }; + + let db = ListingDatabase::connect_with_options(&request) + .await + .unwrap(); + + // Verify the config was parsed correctly + assert_eq!(db.new_table_config.enable_stable_row_ids, Some(true)); + + // Create a table - it should inherit the stable row IDs setting + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + + let reader = Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch)], + schema.clone(), + )); + + let table = db + .create_table(CreateTableRequest { + name: "test_stable".to_string(), + namespace: vec![], + data: CreateTableData::Data(reader), + mode: CreateTableMode::Create, + write_options: Default::default(), + location: None, + }) + .await + .unwrap(); + + // Verify table was created successfully + assert_eq!(table.count_rows(None).await.unwrap(), 3); + } + + #[tokio::test] + async fn test_create_table_with_stable_row_ids_table_level() { + let (_tempdir, db) = setup_database().await; + + // Verify connection has no stable row IDs config + assert_eq!(db.new_table_config.enable_stable_row_ids, None); + + // Create a table with stable row IDs enabled at table level via storage_options + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + + let reader = Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch)], + schema.clone(), + )); + + let mut storage_options = HashMap::new(); + storage_options.insert( + OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(), + "true".to_string(), + ); + + let write_options = WriteOptions { + lance_write_params: Some(lance::dataset::WriteParams { + store_params: Some(lance::io::ObjectStoreParams { + storage_options: Some(storage_options), + ..Default::default() + }), + ..Default::default() + }), + }; + + let table = db + .create_table(CreateTableRequest { + name: "test_stable_table_level".to_string(), + namespace: vec![], + data: CreateTableData::Data(reader), + mode: CreateTableMode::Create, + write_options, + location: None, + }) + .await + .unwrap(); + + // Verify table was created successfully + assert_eq!(table.count_rows(None).await.unwrap(), 3); + } + + #[tokio::test] + async fn test_create_table_stable_row_ids_table_overrides_connection() { + let tempdir = tempdir().unwrap(); + let uri = tempdir.path().to_str().unwrap(); + + // Create database with stable row IDs enabled at connection level + let mut options = HashMap::new(); + options.insert( + OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(), + "true".to_string(), + ); + + let request = ConnectRequest { + uri: uri.to_string(), + #[cfg(feature = "remote")] + client_config: Default::default(), + options, + read_consistency_interval: None, + session: None, + }; + + let db = ListingDatabase::connect_with_options(&request) + .await + .unwrap(); + + assert_eq!(db.new_table_config.enable_stable_row_ids, Some(true)); + + // Create table with stable row IDs disabled at table level (overrides connection) + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + + let reader = Box::new(arrow_array::RecordBatchIterator::new( + vec![Ok(batch)], + schema.clone(), + )); + + let mut storage_options = HashMap::new(); + storage_options.insert( + OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(), + "false".to_string(), + ); + + let write_options = WriteOptions { + lance_write_params: Some(lance::dataset::WriteParams { + store_params: Some(lance::io::ObjectStoreParams { + storage_options: Some(storage_options), + ..Default::default() + }), + ..Default::default() + }), + }; + + let table = db + .create_table(CreateTableRequest { + name: "test_override".to_string(), + namespace: vec![], + data: CreateTableData::Data(reader), + mode: CreateTableMode::Create, + write_options, + location: None, + }) + .await + .unwrap(); + + // Verify table was created successfully + assert_eq!(table.count_rows(None).await.unwrap(), 3); + } + + #[tokio::test] + async fn test_stable_row_ids_invalid_value() { + let tempdir = tempdir().unwrap(); + let uri = tempdir.path().to_str().unwrap(); + + // Try to create database with invalid stable row IDs value + let mut options = HashMap::new(); + options.insert( + OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(), + "not_a_boolean".to_string(), + ); + + let request = ConnectRequest { + uri: uri.to_string(), + #[cfg(feature = "remote")] + client_config: Default::default(), + options, + read_consistency_interval: None, + session: None, + }; + + let result = ListingDatabase::connect_with_options(&request).await; + + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + Error::InvalidInput { message } if message.contains("enable_stable_row_ids must be a boolean") + )); + } + + #[test] + fn test_stable_row_ids_config_serialization() { + // Test that ListingDatabaseOptions correctly serializes stable_row_ids + let mut options = HashMap::new(); + options.insert( + OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(), + "true".to_string(), + ); + + // Parse the options + let db_options = ListingDatabaseOptions::parse_from_map(&options).unwrap(); + assert_eq!( + db_options.new_table_config.enable_stable_row_ids, + Some(true) + ); + + // Serialize back to map + let mut serialized = HashMap::new(); + db_options.serialize_into_map(&mut serialized); + + assert_eq!( + serialized.get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS), + Some(&"true".to_string()) + ); + } + + #[test] + fn test_stable_row_ids_config_parse_false() { + let mut options = HashMap::new(); + options.insert( + OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(), + "false".to_string(), + ); + + let db_options = ListingDatabaseOptions::parse_from_map(&options).unwrap(); + assert_eq!( + db_options.new_table_config.enable_stable_row_ids, + Some(false) + ); + } + + #[test] + fn test_stable_row_ids_config_not_set() { + let options = HashMap::new(); + + let db_options = ListingDatabaseOptions::parse_from_map(&options).unwrap(); + assert_eq!(db_options.new_table_config.enable_stable_row_ids, None); + } }