mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-26 08:20:39 +00:00
feat: support setting unenforced primary key (#3394)
## Summary Adds `Table::set_unenforced_primary_key` — records a single column as the table's unenforced primary key in Lance schema field metadata. "Unenforced" means LanceDB does not check uniqueness on write; the key is metadata that `merge_insert` consumes. - Single-column only; the column must exist and have a supported dtype (Int32, Int64, Utf8, LargeUtf8, Binary, LargeBinary, FixedSizeBinary). The API accepts an iterable for binding ergonomics but requires exactly one column — compound keys are rejected. - The primary key is immutable: calling this on a table that already has an unenforced primary key is rejected. Concurrent writers racing to set the key fail at commit time rather than silently overriding it. - `RemoteTable` returns `NotSupported`. - Bindings: Python (`AsyncTable`, `LanceTable`, `RemoteTable`) and TypeScript (`Table.setUnenforcedPrimaryKey`). ## Context Split out from #3354 per review feedback, so the unenforced primary key and the `merge_insert` sharding spec land as separate reviewable PRs. No Lance dependency bump — `main` is already on v7.0.0-beta.10, which includes the field-metadata round-trip fix the API relies on. Enforcing primary-key immutability at the Lance commit layer (so the cross-column concurrent race is also rejected) is a companion Lance change: lance-format/lance#6810.
This commit is contained in:
@@ -690,6 +690,31 @@ of the given query
|
||||
|
||||
***
|
||||
|
||||
### setUnenforcedPrimaryKey()
|
||||
|
||||
```ts
|
||||
abstract setUnenforcedPrimaryKey(columns): Promise<void>
|
||||
```
|
||||
|
||||
Set the unenforced primary key for this table to a single column.
|
||||
|
||||
"Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
column is recorded in the schema as the primary key for use by features
|
||||
such as `merge_insert`. Only single-column primary keys are supported,
|
||||
and the key cannot be changed once set.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **columns**: `string` \| `string`[]
|
||||
The primary key column. A one-element
|
||||
array is also accepted; passing more than one column is rejected.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
***
|
||||
|
||||
### stats()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -2348,3 +2348,52 @@ describe("when creating a table with Float32Array vectors", () => {
|
||||
expect((fsl.children[0].type as Float32).precision).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("setUnenforcedPrimaryKey", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||
});
|
||||
afterEach(() => tmpDir.removeCallback());
|
||||
|
||||
it("sets a single-column primary key (string or one-element array)", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const schema = new arrow.Schema([
|
||||
new arrow.Field("id", new arrow.Int64(), false),
|
||||
]);
|
||||
const t1 = await conn.createEmptyTable("t1", schema);
|
||||
await t1.setUnenforcedPrimaryKey("id");
|
||||
|
||||
const t2 = await conn.createEmptyTable("t2", schema);
|
||||
await t2.setUnenforcedPrimaryKey(["id"]);
|
||||
});
|
||||
|
||||
it("rejects a compound primary key", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await conn.createEmptyTable(
|
||||
"t",
|
||||
new arrow.Schema([
|
||||
new arrow.Field("id", new arrow.Int64(), false),
|
||||
new arrow.Field("name", new arrow.Utf8(), false),
|
||||
]),
|
||||
);
|
||||
await expect(
|
||||
table.setUnenforcedPrimaryKey(["id", "name"]),
|
||||
).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("rejects changing the primary key once set", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await conn.createEmptyTable(
|
||||
"t",
|
||||
new arrow.Schema([
|
||||
new arrow.Field("id", new arrow.Int64(), false),
|
||||
new arrow.Field("name", new arrow.Utf8(), false),
|
||||
]),
|
||||
);
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await expect(table.setUnenforcedPrimaryKey("name")).rejects.toThrow();
|
||||
await expect(table.setUnenforcedPrimaryKey("id")).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -449,6 +449,18 @@ export abstract class Table {
|
||||
* containing the new version number of the table after dropping the columns.
|
||||
*/
|
||||
abstract dropColumns(columnNames: string[]): Promise<DropColumnsResult>;
|
||||
/**
|
||||
* Set the unenforced primary key for this table to a single column.
|
||||
*
|
||||
* "Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
* column is recorded in the schema as the primary key for use by features
|
||||
* such as `merge_insert`. Only single-column primary keys are supported,
|
||||
* and the key cannot be changed once set.
|
||||
* @param {string | string[]} columns The primary key column. A one-element
|
||||
* array is also accepted; passing more than one column is rejected.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
abstract setUnenforcedPrimaryKey(columns: string | string[]): Promise<void>;
|
||||
/** Retrieve the version of the table */
|
||||
|
||||
abstract version(): Promise<number>;
|
||||
@@ -897,6 +909,11 @@ export class LocalTable extends Table {
|
||||
return await this.inner.dropColumns(columnNames);
|
||||
}
|
||||
|
||||
async setUnenforcedPrimaryKey(columns: string | string[]): Promise<void> {
|
||||
const cols = typeof columns === "string" ? [columns] : columns;
|
||||
return await this.inner.setUnenforcedPrimaryKey(cols);
|
||||
}
|
||||
|
||||
async version(): Promise<number> {
|
||||
return await this.inner.version();
|
||||
}
|
||||
|
||||
@@ -344,6 +344,14 @@ impl Table {
|
||||
Ok(res.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn set_unenforced_primary_key(&self, columns: Vec<String>) -> napi::Result<()> {
|
||||
self.inner_ref()?
|
||||
.set_unenforced_primary_key(columns)
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn version(&self) -> napi::Result<i64> {
|
||||
self.inner_ref()?
|
||||
|
||||
@@ -217,6 +217,7 @@ class Table:
|
||||
async def uri(self) -> str: ...
|
||||
async def initial_storage_options(self) -> Optional[Dict[str, str]]: ...
|
||||
async def latest_storage_options(self) -> Optional[Dict[str, str]]: ...
|
||||
async def set_unenforced_primary_key(self, columns: List[str]) -> None: ...
|
||||
@property
|
||||
def tags(self) -> Tags: ...
|
||||
def query(self) -> Query: ...
|
||||
|
||||
@@ -655,6 +655,10 @@ class RemoteTable(Table):
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None:
|
||||
"""Not supported on LanceDB Cloud."""
|
||||
return LOOP.run(self._table.set_unenforced_primary_key(columns))
|
||||
|
||||
def drop_index(self, index_name: str):
|
||||
return LOOP.run(self._table.drop_index(index_name))
|
||||
|
||||
|
||||
@@ -3263,6 +3263,11 @@ class LanceTable(Table):
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None:
|
||||
"""Set the unenforced primary key. See
|
||||
[`AsyncTable.set_unenforced_primary_key`][lancedb.AsyncTable.set_unenforced_primary_key]."""
|
||||
return LOOP.run(self._table.set_unenforced_primary_key(columns))
|
||||
|
||||
def uses_v2_manifest_paths(self) -> bool:
|
||||
"""
|
||||
Check if the table is using the new v2 manifest paths.
|
||||
@@ -3808,6 +3813,31 @@ class AsyncTable:
|
||||
Any attempt to use the table after it has been closed will raise an error."""
|
||||
return self._inner.close()
|
||||
|
||||
async def set_unenforced_primary_key(
|
||||
self, columns: Union[str, Iterable[str]]
|
||||
) -> None:
|
||||
"""Set the unenforced primary key for this table to the given
|
||||
ordered list of columns.
|
||||
|
||||
"Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
columns are recorded in the schema as the primary key so that
|
||||
features such as `merge_insert` can use them. Calling this again
|
||||
replaces any previously-set primary key.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
columns : str or Iterable[str]
|
||||
Either a single column name (single-column key) or an ordered
|
||||
iterable of column names (composite key). Each column dtype
|
||||
must be one of: int32, int64, utf8, large_utf8, binary,
|
||||
large_binary, fixed_size_binary.
|
||||
"""
|
||||
if isinstance(columns, str):
|
||||
columns = [columns]
|
||||
else:
|
||||
columns = list(columns)
|
||||
await self._inner.set_unenforced_primary_key(columns)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""The name of the table."""
|
||||
|
||||
79
python/python/tests/test_primary_key.py
Normal file
79
python/python/tests/test_primary_key.py
Normal file
@@ -0,0 +1,79 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
"""Tests for Table.set_unenforced_primary_key."""
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
import lancedb
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
|
||||
|
||||
def _empty_table(path, schema):
|
||||
db = lancedb.connect(path, read_consistency_interval=timedelta(seconds=0))
|
||||
return db.create_table("t", schema=schema)
|
||||
|
||||
|
||||
def test_set_unenforced_primary_key_accepts_string_or_one_element_list(tmp_path):
|
||||
schema = pa.schema([pa.field("id", pa.int64(), nullable=False)])
|
||||
|
||||
# Bare string.
|
||||
table = _empty_table(tmp_path / "s", schema)
|
||||
table.set_unenforced_primary_key("id")
|
||||
|
||||
# One-element list.
|
||||
table = _empty_table(tmp_path / "l", schema)
|
||||
table.set_unenforced_primary_key(["id"])
|
||||
|
||||
|
||||
def test_set_unenforced_primary_key_rejects_compound_and_empty(tmp_path):
|
||||
table = _empty_table(
|
||||
tmp_path,
|
||||
pa.schema(
|
||||
[
|
||||
pa.field("a", pa.utf8(), nullable=False),
|
||||
pa.field("b", pa.int64(), nullable=False),
|
||||
]
|
||||
),
|
||||
)
|
||||
# Compound keys are not supported.
|
||||
with pytest.raises(Exception, match="compound"):
|
||||
table.set_unenforced_primary_key(["a", "b"])
|
||||
# Empty input.
|
||||
with pytest.raises(Exception, match="required"):
|
||||
table.set_unenforced_primary_key([])
|
||||
|
||||
|
||||
def test_set_unenforced_primary_key_is_immutable(tmp_path):
|
||||
table = _empty_table(
|
||||
tmp_path,
|
||||
pa.schema(
|
||||
[
|
||||
pa.field("a", pa.utf8(), nullable=False),
|
||||
pa.field("b", pa.int64(), nullable=False),
|
||||
]
|
||||
),
|
||||
)
|
||||
table.set_unenforced_primary_key("a")
|
||||
# The primary key cannot be changed or re-set once installed.
|
||||
with pytest.raises(Exception, match="already set"):
|
||||
table.set_unenforced_primary_key("b")
|
||||
with pytest.raises(Exception, match="already set"):
|
||||
table.set_unenforced_primary_key("a")
|
||||
|
||||
|
||||
def test_set_unenforced_primary_key_validates(tmp_path):
|
||||
table = _empty_table(
|
||||
tmp_path / "t", pa.schema([pa.field("id", pa.utf8(), nullable=False)])
|
||||
)
|
||||
# Unknown column.
|
||||
with pytest.raises(Exception, match="not found"):
|
||||
table.set_unenforced_primary_key("nonexistent")
|
||||
|
||||
# Unsupported dtype (Float32 not in the supported set).
|
||||
bad = _empty_table(
|
||||
tmp_path / "bad", pa.schema([pa.field("id", pa.float32(), nullable=False)])
|
||||
)
|
||||
with pytest.raises(Exception, match="not supported"):
|
||||
bad.set_unenforced_primary_key("id")
|
||||
@@ -805,6 +805,19 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_unenforced_primary_key<'a>(
|
||||
self_: PyRef<'a, Self>,
|
||||
columns: Vec<String>,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner
|
||||
.set_unenforced_primary_key(columns)
|
||||
.await
|
||||
.infer_error()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
|
||||
@@ -1667,6 +1667,12 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Ok(merge_insert_response)
|
||||
}
|
||||
|
||||
async fn set_unenforced_primary_key(&self, _columns: &[&str]) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "set_unenforced_primary_key is not supported on LanceDB cloud.".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
|
||||
Ok(Box::new(RemoteTags { inner: self }))
|
||||
}
|
||||
|
||||
@@ -74,6 +74,7 @@ pub(crate) mod dataset;
|
||||
pub mod delete;
|
||||
pub mod merge;
|
||||
pub mod optimize;
|
||||
mod primary_key;
|
||||
pub mod query;
|
||||
pub mod schema_evolution;
|
||||
pub mod update;
|
||||
@@ -345,6 +346,20 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
params: MergeInsertBuilder,
|
||||
new_data: Box<dyn RecordBatchReader + Send>,
|
||||
) -> Result<MergeResult>;
|
||||
/// Set the unenforced primary key for the table to a single column.
|
||||
///
|
||||
/// "Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
/// column is recorded in the schema as the primary key for use by
|
||||
/// features such as `merge_insert`. Only single-column primary keys are
|
||||
/// supported, and the key cannot be changed once set.
|
||||
///
|
||||
/// The default implementation returns `NotSupported`; table types
|
||||
/// backed by a Lance dataset override it.
|
||||
async fn set_unenforced_primary_key(&self, _columns: &[&str]) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "set_unenforced_primary_key is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
/// Gets the table tag manager.
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
|
||||
/// Optimize the dataset.
|
||||
@@ -1063,6 +1078,28 @@ impl Table {
|
||||
self.inner.drop_columns(columns).await
|
||||
}
|
||||
|
||||
/// Set the unenforced primary key for this table to a single column.
|
||||
///
|
||||
/// "Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
/// column is recorded in the schema as the primary key so that features
|
||||
/// such as `merge_insert` can use it.
|
||||
///
|
||||
/// Only single-column primary keys are supported, and the key cannot be
|
||||
/// changed once set — calling this on a table that already has an
|
||||
/// unenforced primary key fails. `columns` is an iterable for binding
|
||||
/// ergonomics but must yield exactly one column:
|
||||
///
|
||||
/// - `table.set_unenforced_primary_key(["id"])`
|
||||
pub async fn set_unenforced_primary_key<I, S>(&self, columns: I) -> Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = S>,
|
||||
S: Into<String>,
|
||||
{
|
||||
let owned: Vec<String> = columns.into_iter().map(Into::into).collect();
|
||||
let borrowed: Vec<&str> = owned.iter().map(String::as_str).collect();
|
||||
self.inner.set_unenforced_primary_key(&borrowed).await
|
||||
}
|
||||
|
||||
/// Retrieve the version of the table
|
||||
///
|
||||
/// LanceDb supports versioning. Every operation that modifies the table increases
|
||||
@@ -2469,6 +2506,10 @@ impl BaseTable for NativeTable {
|
||||
merge::execute_merge_insert(self, params, new_data).await
|
||||
}
|
||||
|
||||
async fn set_unenforced_primary_key(&self, columns: &[&str]) -> Result<()> {
|
||||
primary_key::set_unenforced_primary_key(self, columns).await
|
||||
}
|
||||
|
||||
/// Delete rows from the table
|
||||
async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
|
||||
// Delegate to the submodule implementation
|
||||
@@ -2770,6 +2811,7 @@ mod tests {
|
||||
use futures::TryStreamExt;
|
||||
use lance::Dataset;
|
||||
use lance::io::{ObjectStoreParams, WrappingObjectStore};
|
||||
use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION;
|
||||
use tempfile::tempdir;
|
||||
|
||||
use super::*;
|
||||
@@ -3864,6 +3906,152 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_unenforced_primary_key() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
Field::new("name", DataType::Utf8, true),
|
||||
Field::new("score", DataType::Float64, true),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3])),
|
||||
Arc::new(StringArray::from(vec!["a", "b", "c"])),
|
||||
Arc::new(arrow_array::Float64Array::from(vec![1.0, 2.0, 3.0])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
// Reject empty input.
|
||||
let err = table
|
||||
.set_unenforced_primary_key(Vec::<&str>::new())
|
||||
.await
|
||||
.expect_err("empty input should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
|
||||
// Reject compound (multi-column) input.
|
||||
let err = table
|
||||
.set_unenforced_primary_key(["id", "name"])
|
||||
.await
|
||||
.expect_err("compound primary key should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
|
||||
// Reject unknown column.
|
||||
let err = table
|
||||
.set_unenforced_primary_key(["nonexistent"])
|
||||
.await
|
||||
.expect_err("nonexistent column should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
|
||||
// Reject unsupported dtype (Float64).
|
||||
let err = table
|
||||
.set_unenforced_primary_key(["score"])
|
||||
.await
|
||||
.expect_err("Float64 should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
|
||||
// None of the rejected calls set a primary key.
|
||||
let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
|
||||
assert!(lance_schema.unenforced_primary_key().is_empty());
|
||||
|
||||
// Happy path: set the primary key to "id".
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
|
||||
let pk = lance_schema.unenforced_primary_key();
|
||||
assert_eq!(pk.len(), 1);
|
||||
assert_eq!(pk[0].name, "id");
|
||||
// Position metadata is 1-indexed.
|
||||
assert_eq!(
|
||||
pk[0].metadata.get(LANCE_UNENFORCED_PRIMARY_KEY_POSITION),
|
||||
Some(&"1".to_string())
|
||||
);
|
||||
|
||||
// The primary key is immutable: re-setting it is rejected, whether to
|
||||
// the same column or a different one.
|
||||
let err = table
|
||||
.set_unenforced_primary_key(["id"])
|
||||
.await
|
||||
.expect_err("re-setting the same primary key should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
let err = table
|
||||
.set_unenforced_primary_key(["name"])
|
||||
.await
|
||||
.expect_err("changing the primary key should be rejected");
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {:?}", err);
|
||||
|
||||
// The primary key is unchanged after the rejected calls.
|
||||
let lance_schema = table.as_native().unwrap().manifest().await.unwrap().schema;
|
||||
let pk = lance_schema.unenforced_primary_key();
|
||||
assert_eq!(pk.len(), 1);
|
||||
assert_eq!(pk[0].name, "id");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_unenforced_primary_key_concurrent() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(arrow_array::Int64Array::from(vec![1, 2, 3]))],
|
||||
)
|
||||
.unwrap();
|
||||
let reader: Box<dyn arrow_array::RecordBatchReader + Send> =
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()));
|
||||
|
||||
// A long read-consistency interval keeps each handle pinned to the
|
||||
// version it opened, so the second handle commits against a stale
|
||||
// base — the same situation as two processes racing.
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(3600))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
let table_a = conn.open_table("t").execute().await.unwrap();
|
||||
let table_b = conn.open_table("t").execute().await.unwrap();
|
||||
|
||||
// Handle A sets the primary key first.
|
||||
table_a.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
|
||||
// Handle B committed against a stale base that had no primary key, so
|
||||
// its own up-front check did not see A's key. The commit itself must
|
||||
// still fail rather than silently overriding A's primary key. (The
|
||||
// cross-process race on a *different* column is caught by the Lance
|
||||
// commit layer.)
|
||||
let err = table_b
|
||||
.set_unenforced_primary_key(["id"])
|
||||
.await
|
||||
.expect_err("concurrent primary key commit on a stale base should fail");
|
||||
assert!(
|
||||
!matches!(err, Error::InvalidInput { .. }),
|
||||
"expected a commit-time conflict, not an up-front input error: {:?}",
|
||||
err
|
||||
);
|
||||
|
||||
// The committed primary key is exactly what A set — no corruption.
|
||||
let fresh = conn.open_table("t").execute().await.unwrap();
|
||||
let lance_schema = fresh.as_native().unwrap().manifest().await.unwrap().schema;
|
||||
let pk = lance_schema.unenforced_primary_key();
|
||||
assert_eq!(pk.len(), 1);
|
||||
assert_eq!(pk[0].name, "id");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_stats() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
|
||||
113
rust/lancedb/src/table/primary_key.rs
Normal file
113
rust/lancedb/src/table/primary_key.rs
Normal file
@@ -0,0 +1,113 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! Table-level unenforced primary key support.
|
||||
//!
|
||||
//! [`set_unenforced_primary_key`] records a column as the table's primary key
|
||||
//! by writing Lance schema field metadata. "Unenforced" means LanceDB does not
|
||||
//! check uniqueness on write; the key is metadata for features such as
|
||||
//! `merge_insert` to consume.
|
||||
//!
|
||||
//! Only a single-column primary key is supported, and the key cannot be
|
||||
//! changed once set.
|
||||
|
||||
use arrow_schema::DataType;
|
||||
use lance_core::datatypes::{LANCE_UNENFORCED_PRIMARY_KEY, LANCE_UNENFORCED_PRIMARY_KEY_POSITION};
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::table::NativeTable;
|
||||
|
||||
/// Set the unenforced primary key on `table` to the single column in `columns`.
|
||||
///
|
||||
/// Fails if `columns` is not exactly one column (compound primary keys are not
|
||||
/// supported), if the column does not exist or has an unsupported dtype, or if
|
||||
/// the table already has an unenforced primary key (changing the primary key
|
||||
/// is not supported).
|
||||
pub(super) async fn set_unenforced_primary_key(
|
||||
table: &NativeTable,
|
||||
columns: &[&str],
|
||||
) -> Result<()> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
|
||||
if columns.is_empty() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "set_unenforced_primary_key: a column is required".into(),
|
||||
});
|
||||
}
|
||||
if columns.len() > 1 {
|
||||
return Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
"set_unenforced_primary_key: compound primary keys are not supported, got {} columns",
|
||||
columns.len()
|
||||
),
|
||||
});
|
||||
}
|
||||
let column = columns[0];
|
||||
|
||||
let updates = {
|
||||
let dataset = table.dataset.get().await?;
|
||||
let schema = dataset.schema();
|
||||
|
||||
// The primary key is immutable once set. The Lance commit layer is the
|
||||
// source of truth for this (it also covers the concurrent-writer race);
|
||||
// this check just fails fast with a clear message.
|
||||
if !schema.unenforced_primary_key().is_empty() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "set_unenforced_primary_key: an unenforced primary key is already set on this table; changing it is not supported".into(),
|
||||
});
|
||||
}
|
||||
|
||||
let field = schema.field(column).ok_or_else(|| Error::InvalidInput {
|
||||
message: format!(
|
||||
"set_unenforced_primary_key: column '{}' not found on table",
|
||||
column
|
||||
),
|
||||
})?;
|
||||
if !is_supported_pk_dtype(&field.data_type()) {
|
||||
return Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
"set_unenforced_primary_key: column '{}' has dtype {:?} which is not supported as a primary key. Supported: Int32, Int64, Utf8, LargeUtf8, Binary, LargeBinary, FixedSizeBinary",
|
||||
column,
|
||||
field.data_type()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
// Position metadata is 1-indexed; `Schema::unenforced_primary_key`
|
||||
// treats position 0 as a legacy "no specific position" fallback.
|
||||
let mut metadata = field.metadata.clone();
|
||||
metadata.remove(LANCE_UNENFORCED_PRIMARY_KEY);
|
||||
metadata.insert(
|
||||
LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
|
||||
"1".to_string(),
|
||||
);
|
||||
vec![(field_id_to_u32(field.id, &field.name)?, metadata)]
|
||||
};
|
||||
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
dataset.replace_field_metadata(updates).await?;
|
||||
table.dataset.update(dataset);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn field_id_to_u32(id: i32, name: &str) -> Result<u32> {
|
||||
u32::try_from(id).map_err(|_| Error::Runtime {
|
||||
message: format!(
|
||||
"internal: field '{}' has unexpected negative field id {}",
|
||||
name, id
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
fn is_supported_pk_dtype(dtype: &DataType) -> bool {
|
||||
matches!(
|
||||
dtype,
|
||||
DataType::Int32
|
||||
| DataType::Int64
|
||||
| DataType::Utf8
|
||||
| DataType::LargeUtf8
|
||||
| DataType::Binary
|
||||
| DataType::LargeBinary
|
||||
| DataType::FixedSizeBinary(_)
|
||||
)
|
||||
}
|
||||
Reference in New Issue
Block a user