diff --git a/docs/src/js/classes/Table.md b/docs/src/js/classes/Table.md index 87582376a..ca940e819 100644 --- a/docs/src/js/classes/Table.md +++ b/docs/src/js/classes/Table.md @@ -690,6 +690,31 @@ of the given query *** +### setUnenforcedPrimaryKey() + +```ts +abstract setUnenforcedPrimaryKey(columns): Promise +``` + +Set the unenforced primary key for this table to a single column. + +"Unenforced" means LanceDB does not check uniqueness on writes; the +column is recorded in the schema as the primary key for use by features +such as `merge_insert`. Only single-column primary keys are supported, +and the key cannot be changed once set. + +#### Parameters + +* **columns**: `string` \| `string`[] + The primary key column. A one-element + array is also accepted; passing more than one column is rejected. + +#### Returns + +`Promise`<`void`> + +*** + ### stats() ```ts diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index 36c616438..882733df6 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -2348,3 +2348,52 @@ describe("when creating a table with Float32Array vectors", () => { expect((fsl.children[0].type as Float32).precision).toBe(1); }); }); + +describe("setUnenforcedPrimaryKey", () => { + let tmpDir: tmp.DirResult; + + beforeEach(() => { + tmpDir = tmp.dirSync({ unsafeCleanup: true }); + }); + afterEach(() => tmpDir.removeCallback()); + + it("sets a single-column primary key (string or one-element array)", async () => { + const conn = await connect(tmpDir.name); + const schema = new arrow.Schema([ + new arrow.Field("id", new arrow.Int64(), false), + ]); + const t1 = await conn.createEmptyTable("t1", schema); + await t1.setUnenforcedPrimaryKey("id"); + + const t2 = await conn.createEmptyTable("t2", schema); + await t2.setUnenforcedPrimaryKey(["id"]); + }); + + it("rejects a compound primary key", async () => { + const conn = await connect(tmpDir.name); + const table = await conn.createEmptyTable( + "t", + new arrow.Schema([ + new arrow.Field("id", new arrow.Int64(), false), + new arrow.Field("name", new arrow.Utf8(), false), + ]), + ); + await expect( + table.setUnenforcedPrimaryKey(["id", "name"]), + ).rejects.toThrow(); + }); + + it("rejects changing the primary key once set", async () => { + const conn = await connect(tmpDir.name); + const table = await conn.createEmptyTable( + "t", + new arrow.Schema([ + new arrow.Field("id", new arrow.Int64(), false), + new arrow.Field("name", new arrow.Utf8(), false), + ]), + ); + await table.setUnenforcedPrimaryKey("id"); + await expect(table.setUnenforcedPrimaryKey("name")).rejects.toThrow(); + await expect(table.setUnenforcedPrimaryKey("id")).rejects.toThrow(); + }); +}); diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 264b8897d..56cc127ce 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -449,6 +449,18 @@ export abstract class Table { * containing the new version number of the table after dropping the columns. */ abstract dropColumns(columnNames: string[]): Promise; + /** + * Set the unenforced primary key for this table to a single column. + * + * "Unenforced" means LanceDB does not check uniqueness on writes; the + * column is recorded in the schema as the primary key for use by features + * such as `merge_insert`. Only single-column primary keys are supported, + * and the key cannot be changed once set. + * @param {string | string[]} columns The primary key column. A one-element + * array is also accepted; passing more than one column is rejected. + * @returns {Promise} + */ + abstract setUnenforcedPrimaryKey(columns: string | string[]): Promise; /** Retrieve the version of the table */ abstract version(): Promise; @@ -897,6 +909,11 @@ export class LocalTable extends Table { return await this.inner.dropColumns(columnNames); } + async setUnenforcedPrimaryKey(columns: string | string[]): Promise { + const cols = typeof columns === "string" ? [columns] : columns; + return await this.inner.setUnenforcedPrimaryKey(cols); + } + async version(): Promise { return await this.inner.version(); } diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index 6100d7bd7..94e7bf630 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -344,6 +344,14 @@ impl Table { Ok(res.into()) } + #[napi(catch_unwind)] + pub async fn set_unenforced_primary_key(&self, columns: Vec) -> napi::Result<()> { + self.inner_ref()? + .set_unenforced_primary_key(columns) + .await + .default_error() + } + #[napi(catch_unwind)] pub async fn version(&self) -> napi::Result { self.inner_ref()? diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index 8839af156..2ba2b6fb8 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -217,6 +217,7 @@ class Table: async def uri(self) -> str: ... async def initial_storage_options(self) -> Optional[Dict[str, str]]: ... async def latest_storage_options(self) -> Optional[Dict[str, str]]: ... + async def set_unenforced_primary_key(self, columns: List[str]) -> None: ... @property def tags(self) -> Tags: ... def query(self) -> Query: ... diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index f4237110d..64aca21f1 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -655,6 +655,10 @@ class RemoteTable(Table): def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult: return LOOP.run(self._table.drop_columns(columns)) + def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None: + """Not supported on LanceDB Cloud.""" + return LOOP.run(self._table.set_unenforced_primary_key(columns)) + def drop_index(self, index_name: str): return LOOP.run(self._table.drop_index(index_name)) diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index c00c14f9c..bc3b22b1c 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -3263,6 +3263,11 @@ class LanceTable(Table): def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult: return LOOP.run(self._table.drop_columns(columns)) + def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None: + """Set the unenforced primary key. See + [`AsyncTable.set_unenforced_primary_key`][lancedb.AsyncTable.set_unenforced_primary_key].""" + return LOOP.run(self._table.set_unenforced_primary_key(columns)) + def uses_v2_manifest_paths(self) -> bool: """ Check if the table is using the new v2 manifest paths. @@ -3808,6 +3813,31 @@ class AsyncTable: Any attempt to use the table after it has been closed will raise an error.""" return self._inner.close() + async def set_unenforced_primary_key( + self, columns: Union[str, Iterable[str]] + ) -> None: + """Set the unenforced primary key for this table to the given + ordered list of columns. + + "Unenforced" means LanceDB does not check uniqueness on writes; the + columns are recorded in the schema as the primary key so that + features such as `merge_insert` can use them. Calling this again + replaces any previously-set primary key. + + Parameters + ---------- + columns : str or Iterable[str] + Either a single column name (single-column key) or an ordered + iterable of column names (composite key). Each column dtype + must be one of: int32, int64, utf8, large_utf8, binary, + large_binary, fixed_size_binary. + """ + if isinstance(columns, str): + columns = [columns] + else: + columns = list(columns) + await self._inner.set_unenforced_primary_key(columns) + @property def name(self) -> str: """The name of the table.""" diff --git a/python/python/tests/test_primary_key.py b/python/python/tests/test_primary_key.py new file mode 100644 index 000000000..b32e24f16 --- /dev/null +++ b/python/python/tests/test_primary_key.py @@ -0,0 +1,79 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The LanceDB Authors + +"""Tests for Table.set_unenforced_primary_key.""" + +from datetime import timedelta + +import lancedb +import pyarrow as pa +import pytest + + +def _empty_table(path, schema): + db = lancedb.connect(path, read_consistency_interval=timedelta(seconds=0)) + return db.create_table("t", schema=schema) + + +def test_set_unenforced_primary_key_accepts_string_or_one_element_list(tmp_path): + schema = pa.schema([pa.field("id", pa.int64(), nullable=False)]) + + # Bare string. + table = _empty_table(tmp_path / "s", schema) + table.set_unenforced_primary_key("id") + + # One-element list. + table = _empty_table(tmp_path / "l", schema) + table.set_unenforced_primary_key(["id"]) + + +def test_set_unenforced_primary_key_rejects_compound_and_empty(tmp_path): + table = _empty_table( + tmp_path, + pa.schema( + [ + pa.field("a", pa.utf8(), nullable=False), + pa.field("b", pa.int64(), nullable=False), + ] + ), + ) + # Compound keys are not supported. + with pytest.raises(Exception, match="compound"): + table.set_unenforced_primary_key(["a", "b"]) + # Empty input. + with pytest.raises(Exception, match="required"): + table.set_unenforced_primary_key([]) + + +def test_set_unenforced_primary_key_is_immutable(tmp_path): + table = _empty_table( + tmp_path, + pa.schema( + [ + pa.field("a", pa.utf8(), nullable=False), + pa.field("b", pa.int64(), nullable=False), + ] + ), + ) + table.set_unenforced_primary_key("a") + # The primary key cannot be changed or re-set once installed. + with pytest.raises(Exception, match="already set"): + table.set_unenforced_primary_key("b") + with pytest.raises(Exception, match="already set"): + table.set_unenforced_primary_key("a") + + +def test_set_unenforced_primary_key_validates(tmp_path): + table = _empty_table( + tmp_path / "t", pa.schema([pa.field("id", pa.utf8(), nullable=False)]) + ) + # Unknown column. + with pytest.raises(Exception, match="not found"): + table.set_unenforced_primary_key("nonexistent") + + # Unsupported dtype (Float32 not in the supported set). + bad = _empty_table( + tmp_path / "bad", pa.schema([pa.field("id", pa.float32(), nullable=False)]) + ) + with pytest.raises(Exception, match="not supported"): + bad.set_unenforced_primary_key("id") diff --git a/python/src/table.rs b/python/src/table.rs index 9ac5af807..0c6298779 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -805,6 +805,19 @@ impl Table { }) } + pub fn set_unenforced_primary_key<'a>( + self_: PyRef<'a, Self>, + columns: Vec, + ) -> PyResult> { + let inner = self_.inner_ref()?.clone(); + future_into_py(self_.py(), async move { + inner + .set_unenforced_primary_key(columns) + .await + .infer_error() + }) + } + pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult> { let inner = self_.inner_ref()?.clone(); future_into_py(self_.py(), async move { diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index 34c513bcd..ad9a7a303 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -1667,6 +1667,12 @@ impl BaseTable for RemoteTable { Ok(merge_insert_response) } + async fn set_unenforced_primary_key(&self, _columns: &[&str]) -> Result<()> { + Err(Error::NotSupported { + message: "set_unenforced_primary_key is not supported on LanceDB cloud.".into(), + }) + } + async fn tags(&self) -> Result> { Ok(Box::new(RemoteTags { inner: self })) } diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 2f340436e..d32403a4a 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -74,6 +74,7 @@ pub(crate) mod dataset; pub mod delete; pub mod merge; pub mod optimize; +mod primary_key; pub mod query; pub mod schema_evolution; pub mod update; @@ -345,6 +346,20 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { params: MergeInsertBuilder, new_data: Box, ) -> Result; + /// Set the unenforced primary key for the table to a single column. + /// + /// "Unenforced" means LanceDB does not check uniqueness on writes; the + /// column is recorded in the schema as the primary key for use by + /// features such as `merge_insert`. Only single-column primary keys are + /// supported, and the key cannot be changed once set. + /// + /// The default implementation returns `NotSupported`; table types + /// backed by a Lance dataset override it. + async fn set_unenforced_primary_key(&self, _columns: &[&str]) -> Result<()> { + Err(Error::NotSupported { + message: "set_unenforced_primary_key is not supported on this table type".into(), + }) + } /// Gets the table tag manager. async fn tags(&self) -> Result>; /// Optimize the dataset. @@ -1063,6 +1078,28 @@ impl Table { self.inner.drop_columns(columns).await } + /// Set the unenforced primary key for this table to a single column. + /// + /// "Unenforced" means LanceDB does not check uniqueness on writes; the + /// column is recorded in the schema as the primary key so that features + /// such as `merge_insert` can use it. + /// + /// Only single-column primary keys are supported, and the key cannot be + /// changed once set — calling this on a table that already has an + /// unenforced primary key fails. `columns` is an iterable for binding + /// ergonomics but must yield exactly one column: + /// + /// - `table.set_unenforced_primary_key(["id"])` + pub async fn set_unenforced_primary_key(&self, columns: I) -> Result<()> + where + I: IntoIterator, + S: Into, + { + let owned: Vec = columns.into_iter().map(Into::into).collect(); + let borrowed: Vec<&str> = owned.iter().map(String::as_str).collect(); + self.inner.set_unenforced_primary_key(&borrowed).await + } + /// Retrieve the version of the table /// /// LanceDb supports versioning. Every operation that modifies the table increases @@ -2469,6 +2506,10 @@ impl BaseTable for NativeTable { merge::execute_merge_insert(self, params, new_data).await } + async fn set_unenforced_primary_key(&self, columns: &[&str]) -> Result<()> { + primary_key::set_unenforced_primary_key(self, columns).await + } + /// Delete rows from the table async fn delete(&self, predicate: &str) -> Result { // Delegate to the submodule implementation @@ -2770,6 +2811,7 @@ mod tests { use futures::TryStreamExt; use lance::Dataset; use lance::io::{ObjectStoreParams, WrappingObjectStore}; + use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION; use tempfile::tempdir; use super::*; @@ -3864,6 +3906,152 @@ mod tests { ); } + #[tokio::test] + async fn test_set_unenforced_primary_key() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("score", DataType::Float64, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + Arc::new(arrow_array::Float64Array::from(vec![1.0, 2.0, 3.0])), + ], + ) + .unwrap(); + let reader: Box = + Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone())); + + let conn = ConnectBuilder::new(uri) + .read_consistency_interval(Duration::from_secs(0)) + .execute() + .await + .unwrap(); + let table = conn.create_table("t", reader).execute().await.unwrap(); + + // Reject empty input. + let err = table + .set_unenforced_primary_key(Vec::<&str>::new()) + .await + .expect_err("empty input should be rejected"); + assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err); + + // Reject compound (multi-column) input. + let err = table + .set_unenforced_primary_key(["id", "name"]) + .await + .expect_err("compound primary key should be rejected"); + assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err); + + // Reject unknown column. + let err = table + .set_unenforced_primary_key(["nonexistent"]) + .await + .expect_err("nonexistent column should be rejected"); + assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err); + + // Reject unsupported dtype (Float64). + let err = table + .set_unenforced_primary_key(["score"]) + .await + .expect_err("Float64 should be rejected"); + assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err); + + // None of the rejected calls set a primary key. + let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema; + assert!(lance_schema.unenforced_primary_key().is_empty()); + + // Happy path: set the primary key to "id". + table.set_unenforced_primary_key(["id"]).await.unwrap(); + let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema; + let pk = lance_schema.unenforced_primary_key(); + assert_eq!(pk.len(), 1); + assert_eq!(pk[0].name, "id"); + // Position metadata is 1-indexed. + assert_eq!( + pk[0].metadata.get(LANCE_UNENFORCED_PRIMARY_KEY_POSITION), + Some(&"1".to_string()) + ); + + // The primary key is immutable: re-setting it is rejected, whether to + // the same column or a different one. + let err = table + .set_unenforced_primary_key(["id"]) + .await + .expect_err("re-setting the same primary key should be rejected"); + assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err); + let err = table + .set_unenforced_primary_key(["name"]) + .await + .expect_err("changing the primary key should be rejected"); + assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err); + + // The primary key is unchanged after the rejected calls. + let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema; + let pk = lance_schema.unenforced_primary_key(); + assert_eq!(pk.len(), 1); + assert_eq!(pk[0].name, "id"); + } + + #[tokio::test] + async fn test_set_unenforced_primary_key_concurrent() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + let reader: Box = + Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone())); + + // A long read-consistency interval keeps each handle pinned to the + // version it opened, so the second handle commits against a stale + // base — the same situation as two processes racing. + let conn = ConnectBuilder::new(uri) + .read_consistency_interval(Duration::from_secs(3600)) + .execute() + .await + .unwrap(); + conn.create_table("t", reader).execute().await.unwrap(); + + let table_a = conn.open_table("t").execute().await.unwrap(); + let table_b = conn.open_table("t").execute().await.unwrap(); + + // Handle A sets the primary key first. + table_a.set_unenforced_primary_key(["id"]).await.unwrap(); + + // Handle B committed against a stale base that had no primary key, so + // its own up-front check did not see A's key. The commit itself must + // still fail rather than silently overriding A's primary key. (The + // cross-process race on a *different* column is caught by the Lance + // commit layer.) + let err = table_b + .set_unenforced_primary_key(["id"]) + .await + .expect_err("concurrent primary key commit on a stale base should fail"); + assert!( + !matches!(err, Error::InvalidInput { .. }), + "expected a commit-time conflict, not an up-front input error: {:?}", + err + ); + + // The committed primary key is exactly what A set — no corruption. + let fresh = conn.open_table("t").execute().await.unwrap(); + let lance_schema = fresh.as_native().unwrap().manifest().await.unwrap().schema; + let pk = lance_schema.unenforced_primary_key(); + assert_eq!(pk.len(), 1); + assert_eq!(pk[0].name, "id"); + } + #[tokio::test] pub async fn test_stats() { let tmp_dir = tempdir().unwrap(); diff --git a/rust/lancedb/src/table/primary_key.rs b/rust/lancedb/src/table/primary_key.rs new file mode 100644 index 000000000..8a7b48efa --- /dev/null +++ b/rust/lancedb/src/table/primary_key.rs @@ -0,0 +1,113 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The LanceDB Authors + +//! Table-level unenforced primary key support. +//! +//! [`set_unenforced_primary_key`] records a column as the table's primary key +//! by writing Lance schema field metadata. "Unenforced" means LanceDB does not +//! check uniqueness on write; the key is metadata for features such as +//! `merge_insert` to consume. +//! +//! Only a single-column primary key is supported, and the key cannot be +//! changed once set. + +use arrow_schema::DataType; +use lance_core::datatypes::{LANCE_UNENFORCED_PRIMARY_KEY, LANCE_UNENFORCED_PRIMARY_KEY_POSITION}; + +use crate::error::{Error, Result}; +use crate::table::NativeTable; + +/// Set the unenforced primary key on `table` to the single column in `columns`. +/// +/// Fails if `columns` is not exactly one column (compound primary keys are not +/// supported), if the column does not exist or has an unsupported dtype, or if +/// the table already has an unenforced primary key (changing the primary key +/// is not supported). +pub(super) async fn set_unenforced_primary_key( + table: &NativeTable, + columns: &[&str], +) -> Result<()> { + table.dataset.ensure_mutable()?; + + if columns.is_empty() { + return Err(Error::InvalidInput { + message: "set_unenforced_primary_key: a column is required".into(), + }); + } + if columns.len() > 1 { + return Err(Error::InvalidInput { + message: format!( + "set_unenforced_primary_key: compound primary keys are not supported, got {} columns", + columns.len() + ), + }); + } + let column = columns[0]; + + let updates = { + let dataset = table.dataset.get().await?; + let schema = dataset.schema(); + + // The primary key is immutable once set. The Lance commit layer is the + // source of truth for this (it also covers the concurrent-writer race); + // this check just fails fast with a clear message. + if !schema.unenforced_primary_key().is_empty() { + return Err(Error::InvalidInput { + message: "set_unenforced_primary_key: an unenforced primary key is already set on this table; changing it is not supported".into(), + }); + } + + let field = schema.field(column).ok_or_else(|| Error::InvalidInput { + message: format!( + "set_unenforced_primary_key: column '{}' not found on table", + column + ), + })?; + if !is_supported_pk_dtype(&field.data_type()) { + return Err(Error::InvalidInput { + message: format!( + "set_unenforced_primary_key: column '{}' has dtype {:?} which is not supported as a primary key. Supported: Int32, Int64, Utf8, LargeUtf8, Binary, LargeBinary, FixedSizeBinary", + column, + field.data_type() + ), + }); + } + + // Position metadata is 1-indexed; `Schema::unenforced_primary_key` + // treats position 0 as a legacy "no specific position" fallback. + let mut metadata = field.metadata.clone(); + metadata.remove(LANCE_UNENFORCED_PRIMARY_KEY); + metadata.insert( + LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(), + "1".to_string(), + ); + vec![(field_id_to_u32(field.id, &field.name)?, metadata)] + }; + + let mut dataset = (*table.dataset.get().await?).clone(); + dataset.replace_field_metadata(updates).await?; + table.dataset.update(dataset); + Ok(()) +} + +fn field_id_to_u32(id: i32, name: &str) -> Result { + u32::try_from(id).map_err(|_| Error::Runtime { + message: format!( + "internal: field '{}' has unexpected negative field id {}", + name, id + ), + }) +} + +fn is_supported_pk_dtype(dtype: &DataType) -> bool { + matches!( + dtype, + DataType::Int32 + | DataType::Int64 + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary + | DataType::FixedSizeBinary(_) + ) +}