feat: support stable row IDs via storage_options (#2831)

Add support for enabling stable row IDs when creating tables via the
`new_table_enable_stable_row_ids` storage option.

Stable row IDs ensure that row identifiers remain constant after
compaction, update, delete, and merge operations. This is useful for
materialized views and other use cases that need to track source rows
across these operations.

The option can be set at two levels:
- Connection level: applies to all tables created with that connection
- Table level: per-table override via create_table storage_options

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Jonathan Hsieh
2025-12-02 13:57:00 -08:00
committed by GitHub
parent 4b5bb2d76c
commit 44878dd9a5
3 changed files with 467 additions and 6 deletions

View File

@@ -194,6 +194,10 @@ class DBConnection(EnforceOverrides):
connection will be inherited by the table, but can be overridden here.
See available options at
<https://lancedb.com/docs/storage/>
To enable stable row IDs (row IDs remain stable after compaction,
update, delete, and merges), set `new_table_enable_stable_row_ids`
to `"true"` in storage_options when connecting to the database.
data_storage_version: optional, str, default "stable"
Deprecated. Set `storage_options` when connecting to the database and set
`new_table_data_storage_version` in the options.
@@ -1079,6 +1083,10 @@ class AsyncConnection(object):
See available options at
<https://lancedb.com/docs/storage/>
To enable stable row IDs (row IDs remain stable after compaction,
update, delete, and merges), set `new_table_enable_stable_row_ids`
to `"true"` in storage_options when connecting to the database.
Returns
-------
AsyncTable

View File

@@ -441,6 +441,150 @@ async def test_create_table_v2_manifest_paths_async(tmp_path):
assert re.match(r"\d{20}\.manifest", manifest)
@pytest.mark.asyncio
async def test_create_table_stable_row_ids_via_storage_options(tmp_path):
"""Test stable_row_ids via storage_options at connect time."""
import lance
# Connect with stable row IDs enabled as default for new tables
db_with = await lancedb.connect_async(
tmp_path, storage_options={"new_table_enable_stable_row_ids": "true"}
)
# Connect without stable row IDs (default)
db_without = await lancedb.connect_async(
tmp_path, storage_options={"new_table_enable_stable_row_ids": "false"}
)
# Create table using connection with stable row IDs enabled
await db_with.create_table(
"with_stable_via_opts",
data=[{"id": i} for i in range(10)],
)
lance_ds_with = lance.dataset(tmp_path / "with_stable_via_opts.lance")
fragments_with = lance_ds_with.get_fragments()
assert len(fragments_with) > 0
assert fragments_with[0].metadata.row_id_meta is not None
# Create table using connection without stable row IDs
await db_without.create_table(
"without_stable_via_opts",
data=[{"id": i} for i in range(10)],
)
lance_ds_without = lance.dataset(tmp_path / "without_stable_via_opts.lance")
fragments_without = lance_ds_without.get_fragments()
assert len(fragments_without) > 0
assert fragments_without[0].metadata.row_id_meta is None
def test_create_table_stable_row_ids_via_storage_options_sync(tmp_path):
"""Test that enable_stable_row_ids can be set via storage_options (sync API)."""
# Connect with stable row IDs enabled as default for new tables
db_with = lancedb.connect(
tmp_path, storage_options={"new_table_enable_stable_row_ids": "true"}
)
# Connect without stable row IDs (default)
db_without = lancedb.connect(
tmp_path, storage_options={"new_table_enable_stable_row_ids": "false"}
)
# Create table using connection with stable row IDs enabled
tbl_with = db_with.create_table(
"with_stable_sync",
data=[{"id": i} for i in range(10)],
)
lance_ds_with = tbl_with.to_lance()
fragments_with = lance_ds_with.get_fragments()
assert len(fragments_with) > 0
assert fragments_with[0].metadata.row_id_meta is not None
# Create table using connection without stable row IDs
tbl_without = db_without.create_table(
"without_stable_sync",
data=[{"id": i} for i in range(10)],
)
lance_ds_without = tbl_without.to_lance()
fragments_without = lance_ds_without.get_fragments()
assert len(fragments_without) > 0
assert fragments_without[0].metadata.row_id_meta is None
@pytest.mark.asyncio
async def test_create_table_stable_row_ids_table_level_override(tmp_path):
"""Test that stable_row_ids can be enabled/disabled at create_table level."""
import lance
# Connect without any stable row ID setting
db_default = await lancedb.connect_async(tmp_path)
# Connect with stable row IDs enabled at connection level
db_with_stable = await lancedb.connect_async(
tmp_path, storage_options={"new_table_enable_stable_row_ids": "true"}
)
# Case 1: No connection setting, enable at table level
await db_default.create_table(
"table_level_enabled",
data=[{"id": i} for i in range(10)],
storage_options={"new_table_enable_stable_row_ids": "true"},
)
lance_ds = lance.dataset(tmp_path / "table_level_enabled.lance")
fragments = lance_ds.get_fragments()
assert len(fragments) > 0
assert fragments[0].metadata.row_id_meta is not None, (
"Table should have stable row IDs when enabled at table level"
)
# Case 2: Connection has stable row IDs, override with false at table level
await db_with_stable.create_table(
"table_level_disabled",
data=[{"id": i} for i in range(10)],
storage_options={"new_table_enable_stable_row_ids": "false"},
)
lance_ds = lance.dataset(tmp_path / "table_level_disabled.lance")
fragments = lance_ds.get_fragments()
assert len(fragments) > 0
assert fragments[0].metadata.row_id_meta is None, (
"Table should NOT have stable row IDs when disabled at table level"
)
def test_create_table_stable_row_ids_table_level_override_sync(tmp_path):
"""Test that stable_row_ids can be enabled/disabled at create_table level (sync)."""
# Connect without any stable row ID setting
db_default = lancedb.connect(tmp_path)
# Connect with stable row IDs enabled at connection level
db_with_stable = lancedb.connect(
tmp_path, storage_options={"new_table_enable_stable_row_ids": "true"}
)
# Case 1: No connection setting, enable at table level
tbl = db_default.create_table(
"table_level_enabled_sync",
data=[{"id": i} for i in range(10)],
storage_options={"new_table_enable_stable_row_ids": "true"},
)
lance_ds = tbl.to_lance()
fragments = lance_ds.get_fragments()
assert len(fragments) > 0
assert fragments[0].metadata.row_id_meta is not None, (
"Table should have stable row IDs when enabled at table level"
)
# Case 2: Connection has stable row IDs, override with false at table level
tbl = db_with_stable.create_table(
"table_level_disabled_sync",
data=[{"id": i} for i in range(10)],
storage_options={"new_table_enable_stable_row_ids": "false"},
)
lance_ds = tbl.to_lance()
fragments = lance_ds.get_fragments()
assert len(fragments) > 0
assert fragments[0].metadata.row_id_meta is None, (
"Table should NOT have stable row IDs when disabled at table level"
)
def test_open_table_sync(tmp_db: lancedb.DBConnection):
tmp_db.create_table("test", data=[{"id": 0}])
assert tmp_db.open_table("test").count_rows() == 1

View File

@@ -35,6 +35,7 @@ pub const LANCE_FILE_EXTENSION: &str = "lance";
pub const OPT_NEW_TABLE_STORAGE_VERSION: &str = "new_table_data_storage_version";
pub const OPT_NEW_TABLE_V2_MANIFEST_PATHS: &str = "new_table_enable_v2_manifest_paths";
pub const OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS: &str = "new_table_enable_stable_row_ids";
/// Controls how new tables should be created
#[derive(Clone, Debug, Default)]
@@ -48,6 +49,12 @@ pub struct NewTableConfig {
/// V2 manifest paths are more efficient than V2 manifest paths but are not
/// supported by old clients.
pub enable_v2_manifest_paths: Option<bool>,
/// Whether to enable stable row IDs for new tables
///
/// When enabled, row IDs remain stable after compaction, update, delete,
/// and merges. This is useful for materialized views and other use cases
/// that need to track source rows across these operations.
pub enable_stable_row_ids: Option<bool>,
}
/// Options specific to the listing database
@@ -87,6 +94,14 @@ impl ListingDatabaseOptions {
})
})
.transpose()?,
enable_stable_row_ids: map
.get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS)
.map(|s| {
s.parse::<bool>().map_err(|_| Error::InvalidInput {
message: format!("enable_stable_row_ids must be a boolean, received {}", s),
})
})
.transpose()?,
};
// We just assume that any options that are not new table config options are storage options
let storage_options = map
@@ -94,6 +109,7 @@ impl ListingDatabaseOptions {
.filter(|(key, _)| {
key.as_str() != OPT_NEW_TABLE_STORAGE_VERSION
&& key.as_str() != OPT_NEW_TABLE_V2_MANIFEST_PATHS
&& key.as_str() != OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS
})
.map(|(key, value)| (key.clone(), value.clone()))
.collect();
@@ -118,6 +134,12 @@ impl DatabaseOptions for ListingDatabaseOptions {
enable_v2_manifest_paths.to_string(),
);
}
if let Some(enable_stable_row_ids) = self.new_table_config.enable_stable_row_ids {
map.insert(
OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(),
enable_stable_row_ids.to_string(),
);
}
}
}
@@ -497,7 +519,7 @@ impl ListingDatabase {
fn extract_storage_overrides(
&self,
request: &CreateTableRequest,
) -> Result<(Option<LanceFileVersion>, Option<bool>)> {
) -> Result<(Option<LanceFileVersion>, Option<bool>, Option<bool>)> {
let storage_options = request
.write_options
.lance_write_params
@@ -518,7 +540,19 @@ impl ListingDatabase {
message: "enable_v2_manifest_paths must be a boolean".to_string(),
})?;
Ok((storage_version_override, v2_manifest_override))
let stable_row_ids_override = storage_options
.and_then(|opts| opts.get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS))
.map(|s| s.parse::<bool>())
.transpose()
.map_err(|_| Error::InvalidInput {
message: "enable_stable_row_ids must be a boolean".to_string(),
})?;
Ok((
storage_version_override,
v2_manifest_override,
stable_row_ids_override,
))
}
/// Prepare write parameters for table creation
@@ -527,6 +561,7 @@ impl ListingDatabase {
request: &CreateTableRequest,
storage_version_override: Option<LanceFileVersion>,
v2_manifest_override: Option<bool>,
stable_row_ids_override: Option<bool>,
) -> lance::dataset::WriteParams {
let mut write_params = request
.write_options
@@ -571,6 +606,13 @@ impl ListingDatabase {
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths;
}
// Apply enable_stable_row_ids: table-level override takes precedence over connection config
if let Some(enable_stable_row_ids) =
stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids)
{
write_params.enable_stable_row_ids = enable_stable_row_ids;
}
if matches!(&request.mode, CreateTableMode::Overwrite) {
write_params.mode = WriteMode::Overwrite;
}
@@ -706,11 +748,15 @@ impl Database for ListingDatabase {
.clone()
.unwrap_or_else(|| self.table_uri(&request.name).unwrap());
let (storage_version_override, v2_manifest_override) =
let (storage_version_override, v2_manifest_override, stable_row_ids_override) =
self.extract_storage_overrides(&request)?;
let write_params =
self.prepare_write_params(&request, storage_version_override, v2_manifest_override);
let write_params = self.prepare_write_params(
&request,
storage_version_override,
v2_manifest_override,
stable_row_ids_override,
);
let data_schema = request.data.arrow_schema();
@@ -921,7 +967,7 @@ impl Database for ListingDatabase {
mod tests {
use super::*;
use crate::connection::ConnectRequest;
use crate::database::{CreateTableData, CreateTableMode, CreateTableRequest};
use crate::database::{CreateTableData, CreateTableMode, CreateTableRequest, WriteOptions};
use crate::table::{Table, TableDefinition};
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
@@ -1621,4 +1667,267 @@ mod tests {
// Cloned table should have all 8 rows from the latest version
assert_eq!(cloned_table.count_rows(None).await.unwrap(), 8);
}
#[tokio::test]
async fn test_create_table_with_stable_row_ids_connection_level() {
let tempdir = tempdir().unwrap();
let uri = tempdir.path().to_str().unwrap();
// Create database with stable row IDs enabled at connection level
let mut options = HashMap::new();
options.insert(
OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(),
"true".to_string(),
);
let request = ConnectRequest {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
options,
read_consistency_interval: None,
session: None,
};
let db = ListingDatabase::connect_with_options(&request)
.await
.unwrap();
// Verify the config was parsed correctly
assert_eq!(db.new_table_config.enable_stable_row_ids, Some(true));
// Create a table - it should inherit the stable row IDs setting
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let reader = Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch)],
schema.clone(),
));
let table = db
.create_table(CreateTableRequest {
name: "test_stable".to_string(),
namespace: vec![],
data: CreateTableData::Data(reader),
mode: CreateTableMode::Create,
write_options: Default::default(),
location: None,
})
.await
.unwrap();
// Verify table was created successfully
assert_eq!(table.count_rows(None).await.unwrap(), 3);
}
#[tokio::test]
async fn test_create_table_with_stable_row_ids_table_level() {
let (_tempdir, db) = setup_database().await;
// Verify connection has no stable row IDs config
assert_eq!(db.new_table_config.enable_stable_row_ids, None);
// Create a table with stable row IDs enabled at table level via storage_options
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let reader = Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch)],
schema.clone(),
));
let mut storage_options = HashMap::new();
storage_options.insert(
OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(),
"true".to_string(),
);
let write_options = WriteOptions {
lance_write_params: Some(lance::dataset::WriteParams {
store_params: Some(lance::io::ObjectStoreParams {
storage_options: Some(storage_options),
..Default::default()
}),
..Default::default()
}),
};
let table = db
.create_table(CreateTableRequest {
name: "test_stable_table_level".to_string(),
namespace: vec![],
data: CreateTableData::Data(reader),
mode: CreateTableMode::Create,
write_options,
location: None,
})
.await
.unwrap();
// Verify table was created successfully
assert_eq!(table.count_rows(None).await.unwrap(), 3);
}
#[tokio::test]
async fn test_create_table_stable_row_ids_table_overrides_connection() {
let tempdir = tempdir().unwrap();
let uri = tempdir.path().to_str().unwrap();
// Create database with stable row IDs enabled at connection level
let mut options = HashMap::new();
options.insert(
OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(),
"true".to_string(),
);
let request = ConnectRequest {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
options,
read_consistency_interval: None,
session: None,
};
let db = ListingDatabase::connect_with_options(&request)
.await
.unwrap();
assert_eq!(db.new_table_config.enable_stable_row_ids, Some(true));
// Create table with stable row IDs disabled at table level (overrides connection)
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let reader = Box::new(arrow_array::RecordBatchIterator::new(
vec![Ok(batch)],
schema.clone(),
));
let mut storage_options = HashMap::new();
storage_options.insert(
OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(),
"false".to_string(),
);
let write_options = WriteOptions {
lance_write_params: Some(lance::dataset::WriteParams {
store_params: Some(lance::io::ObjectStoreParams {
storage_options: Some(storage_options),
..Default::default()
}),
..Default::default()
}),
};
let table = db
.create_table(CreateTableRequest {
name: "test_override".to_string(),
namespace: vec![],
data: CreateTableData::Data(reader),
mode: CreateTableMode::Create,
write_options,
location: None,
})
.await
.unwrap();
// Verify table was created successfully
assert_eq!(table.count_rows(None).await.unwrap(), 3);
}
#[tokio::test]
async fn test_stable_row_ids_invalid_value() {
let tempdir = tempdir().unwrap();
let uri = tempdir.path().to_str().unwrap();
// Try to create database with invalid stable row IDs value
let mut options = HashMap::new();
options.insert(
OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(),
"not_a_boolean".to_string(),
);
let request = ConnectRequest {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
options,
read_consistency_interval: None,
session: None,
};
let result = ListingDatabase::connect_with_options(&request).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
Error::InvalidInput { message } if message.contains("enable_stable_row_ids must be a boolean")
));
}
#[test]
fn test_stable_row_ids_config_serialization() {
// Test that ListingDatabaseOptions correctly serializes stable_row_ids
let mut options = HashMap::new();
options.insert(
OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(),
"true".to_string(),
);
// Parse the options
let db_options = ListingDatabaseOptions::parse_from_map(&options).unwrap();
assert_eq!(
db_options.new_table_config.enable_stable_row_ids,
Some(true)
);
// Serialize back to map
let mut serialized = HashMap::new();
db_options.serialize_into_map(&mut serialized);
assert_eq!(
serialized.get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS),
Some(&"true".to_string())
);
}
#[test]
fn test_stable_row_ids_config_parse_false() {
let mut options = HashMap::new();
options.insert(
OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS.to_string(),
"false".to_string(),
);
let db_options = ListingDatabaseOptions::parse_from_map(&options).unwrap();
assert_eq!(
db_options.new_table_config.enable_stable_row_ids,
Some(false)
);
}
#[test]
fn test_stable_row_ids_config_not_set() {
let options = HashMap::new();
let db_options = ListingDatabaseOptions::parse_from_map(&options).unwrap();
assert_eq!(db_options.new_table_config.enable_stable_row_ids, None);
}
}