Compare commits

..

1 Commits

Author SHA1 Message Date
Brendan Clement
32c77879c9 feat(nodejs): surface skip_auto_cleanup on add and merge insert 2026-05-14 12:59:21 -07:00
42 changed files with 534 additions and 1960 deletions

494
Cargo.lock generated
View File

@@ -1204,6 +1204,26 @@ dependencies = [
"num-traits",
]
[[package]]
name = "bincode"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740"
dependencies = [
"bincode_derive",
"serde",
"unty",
]
[[package]]
name = "bincode_derive"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09"
dependencies = [
"virtue",
]
[[package]]
name = "bit-set"
version = "0.8.0"
@@ -1352,29 +1372,6 @@ version = "3.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb"
[[package]]
name = "bytecheck"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0caa33a2c0edca0419d15ac723dff03f1956f7978329b1e3b5fdaaaed9d3ca8b"
dependencies = [
"bytecheck_derive",
"ptr_meta",
"rancor",
"simdutf8",
]
[[package]]
name = "bytecheck_derive"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89385e82b5d1821d2219e0b095efa2cc1f246cbf99080f3be46a1a85c0d392d9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "bytemuck"
version = "1.25.0"
@@ -2019,12 +2016,6 @@ version = "0.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52560adf09603e58c9a7ee1fe1dcb95a16927b17c127f0ac02d6e768a0e25bc1"
[[package]]
name = "daachorse"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db756b5eb7d81d31f31f660f4132f8cf5698de52fca144c143d0ae0cbb5f2e06"
[[package]]
name = "darling"
version = "0.20.11"
@@ -2954,6 +2945,70 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0"
[[package]]
name = "encoding"
version = "0.2.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b0d943856b990d12d3b55b359144ff341533e516d94098b1d3fc1ac666d36ec"
dependencies = [
"encoding-index-japanese",
"encoding-index-korean",
"encoding-index-simpchinese",
"encoding-index-singlebyte",
"encoding-index-tradchinese",
]
[[package]]
name = "encoding-index-japanese"
version = "1.20141219.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04e8b2ff42e9a05335dbf8b5c6f7567e5591d0d916ccef4e0b1710d32a0d0c91"
dependencies = [
"encoding_index_tests",
]
[[package]]
name = "encoding-index-korean"
version = "1.20141219.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dc33fb8e6bcba213fe2f14275f0963fd16f0a02c878e3095ecfdf5bee529d81"
dependencies = [
"encoding_index_tests",
]
[[package]]
name = "encoding-index-simpchinese"
version = "1.20141219.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d87a7194909b9118fc707194baa434a4e3b0fb6a5a757c73c3adb07aa25031f7"
dependencies = [
"encoding_index_tests",
]
[[package]]
name = "encoding-index-singlebyte"
version = "1.20141219.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3351d5acffb224af9ca265f435b859c7c01537c0849754d3db3fdf2bfe2ae84a"
dependencies = [
"encoding_index_tests",
]
[[package]]
name = "encoding-index-tradchinese"
version = "1.20141219.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd0e20d5688ce3cab59eb3ef3a2083a5c77bf496cb798dc6fcdb75f323890c18"
dependencies = [
"encoding_index_tests",
]
[[package]]
name = "encoding_index_tests"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a246d82be1c9d791c5dfde9a2bd045fc3cbba3fa2b11ad558f27d01712f00569"
[[package]]
name = "encoding_rs"
version = "0.8.35"
@@ -3127,6 +3182,17 @@ dependencies = [
"subtle",
]
[[package]]
name = "filetime"
version = "0.2.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98844151eee8917efc50bd9e8318cb963ae8b297431495d3f758616ea5c57db"
dependencies = [
"cfg-if 1.0.4",
"libc",
"libredox",
]
[[package]]
name = "find-msvc-tools"
version = "0.1.9"
@@ -3212,8 +3278,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow-array",
"rand 0.9.4",
@@ -3667,12 +3733,6 @@ dependencies = [
"serde_core",
]
[[package]]
name = "hashbrown"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a"
[[package]]
name = "heapify"
version = "0.2.0"
@@ -4426,8 +4486,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
[[package]]
name = "lance"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow",
"arrow-arith",
@@ -4472,6 +4532,7 @@ dependencies = [
"lance-table",
"lance-tokenizer",
"log",
"moka",
"object_store",
"permutation",
"pin-project",
@@ -4494,11 +4555,12 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-ipc",
"arrow-ord",
@@ -4515,8 +4577,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrayref",
"paste",
@@ -4525,8 +4587,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4534,6 +4596,7 @@ dependencies = [
"async-trait",
"byteorder",
"bytes",
"chrono",
"datafusion-common",
"datafusion-sql",
"deepsize",
@@ -4542,6 +4605,7 @@ dependencies = [
"lance-arrow",
"libc",
"log",
"mock_instant",
"moka",
"num_cpus",
"object_store",
@@ -4561,8 +4625,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow",
"arrow-array",
@@ -4586,14 +4650,15 @@ dependencies = [
"pin-project",
"prost",
"prost-build",
"snafu 0.9.0",
"tokio",
"tracing",
]
[[package]]
name = "lance-datagen"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow",
"arrow-array",
@@ -4611,8 +4676,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4637,7 +4702,9 @@ dependencies = [
"num-traits",
"prost",
"prost-build",
"prost-types",
"rand 0.9.4",
"snafu 0.9.0",
"strum 0.26.3",
"tokio",
"tracing",
@@ -4647,8 +4714,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4673,14 +4740,15 @@ dependencies = [
"prost",
"prost-build",
"prost-types",
"snafu 0.9.0",
"tokio",
"tracing",
]
[[package]]
name = "lance-index"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arc-swap",
"arrow",
@@ -4701,6 +4769,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datafusion-physical-expr",
"datafusion-sql",
"deepsize",
"dirs",
"fst",
@@ -4735,6 +4804,7 @@ dependencies = [
"serde",
"serde_json",
"smallvec",
"snafu 0.9.0",
"tempfile",
"tokio",
"tracing",
@@ -4744,8 +4814,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow",
"arrow-arith",
@@ -4769,6 +4839,7 @@ dependencies = [
"lance-arrow",
"lance-core",
"lance-namespace",
"libc",
"log",
"moka",
"object_store",
@@ -4779,6 +4850,7 @@ dependencies = [
"prost",
"rand 0.9.4",
"serde",
"snafu 0.9.0",
"tempfile",
"tokio",
"tracing",
@@ -4787,8 +4859,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4804,21 +4876,22 @@ dependencies = [
[[package]]
name = "lance-namespace"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow",
"async-trait",
"bytes",
"lance-core",
"lance-namespace-reqwest-client",
"serde",
"snafu 0.9.0",
]
[[package]]
name = "lance-namespace-impls"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow",
"arrow-ipc",
@@ -4845,6 +4918,7 @@ dependencies = [
"serde",
"serde_json",
"sha2",
"snafu 0.9.0",
"tokio",
"tower",
"tower-http 0.5.2",
@@ -4867,8 +4941,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow",
"arrow-array",
@@ -4907,8 +4981,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4919,8 +4993,8 @@ dependencies = [
[[package]]
name = "lance-tokenizer"
version = "7.0.0-beta.9"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.9#109ebf71fcfddef8faae1af519df339a330debfc"
version = "7.0.0-beta.7"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.7#f6932459689b5568c89baa435ff85a4abf067b45"
dependencies = [
"jieba-rs",
"lindera",
@@ -5041,7 +5115,6 @@ dependencies = [
"arrow",
"async-trait",
"bytes",
"datafusion-common",
"env_logger",
"futures",
"lance-core",
@@ -5161,61 +5234,134 @@ version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ddbf48fd451246b1f8c2610bd3b4ac0cc6e149d89832867093ab69a17194f08"
dependencies = [
"bitflags 2.11.0",
"libc",
"plain",
"redox_syscall 0.7.4",
]
[[package]]
name = "lindera"
version = "3.0.7"
version = "0.44.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74cda79d7161e99b414e4d292ff673cc3f8d22f070d8be3b6185c033363a9216"
checksum = "50aba4ef41052280722f2120f65606b9218e8718032a3c752b953c4d8091f02e"
dependencies = [
"anyhow",
"bincode",
"byteorder",
"csv",
"daachorse",
"kanaria",
"lindera-cc-cedict",
"lindera-dictionary",
"log",
"lindera-ipadic",
"lindera-ipadic-neologd",
"lindera-ko-dic",
"lindera-unidic",
"once_cell",
"percent-encoding",
"regex",
"serde",
"serde_json",
"serde_yaml_ng",
"strum 0.28.0",
"strum_macros 0.28.0",
"serde_yaml",
"strum 0.27.2",
"strum_macros 0.27.2",
"unicode-blocks",
"unicode-normalization",
"unicode-segmentation",
"url",
"yada",
]
[[package]]
name = "lindera-cc-cedict"
version = "0.44.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d77e7a0830fd60f23828ad914439997288c1d2cdd9e269be67f967c27b56350"
dependencies = [
"bincode",
"byteorder",
"lindera-dictionary",
"once_cell",
"tokio",
]
[[package]]
name = "lindera-dictionary"
version = "3.0.7"
version = "0.44.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2385456ca9fe87c29072c5f156b52fdd5e28d5b5738ddfb3979501dbd736530"
checksum = "489cc70922782af3fd397c0e130846caefe1c15b27c2211aac8f88a9f4590aaf"
dependencies = [
"anyhow",
"bincode",
"byteorder",
"csv",
"daachorse",
"derive_builder",
"encoding",
"encoding_rs",
"encoding_rs_io",
"flate2",
"glob",
"log",
"md5",
"memmap2 0.9.10",
"num_cpus",
"once_cell",
"regex",
"rkyv",
"rand 0.9.4",
"reqwest 0.12.28",
"serde",
"serde_json",
"strum 0.28.0",
"strum_macros 0.28.0",
"tar",
"thiserror 2.0.18",
"tokio",
"yada",
]
[[package]]
name = "lindera-ipadic"
version = "0.44.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78870521431dfaf0f94ddd3484fa08367e9d354fc8c708572f2f00007225ddfa"
dependencies = [
"bincode",
"byteorder",
"lindera-dictionary",
"once_cell",
"tokio",
]
[[package]]
name = "lindera-ipadic-neologd"
version = "0.44.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abcb3dc3056e5c683e12c2c5e8d40076f7ecfd7bd46f5fc0e4ae9e58152b5d85"
dependencies = [
"bincode",
"byteorder",
"lindera-dictionary",
"once_cell",
"tokio",
]
[[package]]
name = "lindera-ko-dic"
version = "0.44.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e99316158bab14f0256d912055521ca784f76c63e7460db8a74775c5dc1f8bc2"
dependencies = [
"bincode",
"byteorder",
"lindera-dictionary",
"once_cell",
"tokio",
]
[[package]]
name = "lindera-unidic"
version = "0.44.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52672945166c14276bbba25e4ec79d7e126db1b503c0a6aa07ffc0141ae15cfa"
dependencies = [
"bincode",
"byteorder",
"lindera-dictionary",
"once_cell",
"tokio",
]
[[package]]
@@ -5372,6 +5518,12 @@ dependencies = [
"digest",
]
[[package]]
name = "md5"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0"
[[package]]
name = "mea"
version = "0.6.3"
@@ -5449,6 +5601,12 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "mock_instant"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dce6dd36094cac388f119d2e9dc82dc730ef91c32a6222170d630e5414b956e6"
[[package]]
name = "moka"
version = "0.12.15"
@@ -5525,26 +5683,6 @@ dependencies = [
"target-features",
]
[[package]]
name = "munge"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e17401f259eba956ca16491461b6e8f72913a0a114e39736ce404410f915a0c"
dependencies = [
"munge_macro",
]
[[package]]
name = "munge_macro"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4568f25ccbd45ab5d5603dc34318c1ec56b117531781260002151b8530a9f931"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "napi"
version = "3.8.3"
@@ -6190,7 +6328,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
dependencies = [
"cfg-if 1.0.4",
"libc",
"redox_syscall",
"redox_syscall 0.5.18",
"smallvec",
"windows-link",
]
@@ -6462,6 +6600,12 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
[[package]]
name = "plain"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6"
[[package]]
name = "planus"
version = "0.3.1"
@@ -6948,26 +7092,6 @@ dependencies = [
"cc",
]
[[package]]
name = "ptr_meta"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b9a0cf95a1196af61d4f1cbdab967179516d9a4a4312af1f31948f8f6224a79"
dependencies = [
"ptr_meta_derive",
]
[[package]]
name = "ptr_meta_derive"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7347867d0a7e1208d93b46767be83e2b8f978c3dad35f775ac8d8847551d6fe1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "pulp"
version = "0.22.2"
@@ -7178,15 +7302,6 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09"
[[package]]
name = "rancor"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a063ea72381527c2a0561da9c80000ef822bdd7c3241b1cc1b12100e3df081ee"
dependencies = [
"ptr_meta",
]
[[package]]
name = "rand"
version = "0.8.5"
@@ -7415,6 +7530,15 @@ dependencies = [
"bitflags 2.11.0",
]
[[package]]
name = "redox_syscall"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f450ad9c3b1da563fb6948a8e0fb0fb9269711c9c73d9ea1de5058c79c8d643a"
dependencies = [
"bitflags 2.11.0",
]
[[package]]
name = "redox_users"
version = "0.5.2"
@@ -7487,15 +7611,6 @@ version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2"
[[package]]
name = "rend"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cadadef317c2f20755a64d7fdc48f9e7178ee6b0e1f7fce33fa60f1d68a276e6"
dependencies = [
"bytecheck",
]
[[package]]
name = "reqsign-aliyun-oss"
version = "3.0.0"
@@ -7750,36 +7865,6 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rkyv"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73389e0c99e664f919275ab5b5b0471391fe9a8de61e1dff9b1eaf56a90f16e3"
dependencies = [
"bytecheck",
"bytes",
"hashbrown 0.17.1",
"indexmap 2.13.0",
"munge",
"ptr_meta",
"rancor",
"rend",
"rkyv_derive",
"tinyvec",
"uuid",
]
[[package]]
name = "rkyv_derive"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d2ed0b54125315fb36bd021e82d314d1c126548f871634b483f46b31d13cac6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "roaring"
version = "0.11.4"
@@ -8278,10 +8363,10 @@ dependencies = [
]
[[package]]
name = "serde_yaml_ng"
version = "0.10.0"
name = "serde_yaml"
version = "0.9.34+deprecated"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b4db627b98b36d4203a7b458cf3573730f2bb591b28871d916dfa9efabfd41f"
checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
dependencies = [
"indexmap 2.13.0",
"itoa",
@@ -8673,11 +8758,11 @@ dependencies = [
[[package]]
name = "strum"
version = "0.28.0"
version = "0.27.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9628de9b8791db39ceda2b119bbe13134770b56c138ec1d3af810d045c04f9bd"
checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf"
dependencies = [
"strum_macros 0.28.0",
"strum_macros 0.27.2",
]
[[package]]
@@ -8708,9 +8793,9 @@ dependencies = [
[[package]]
name = "strum_macros"
version = "0.28.0"
version = "0.27.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab85eea0270ee17587ed4156089e10b9e6880ee688791d45a905f5b1ca36f664"
checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7"
dependencies = [
"heck 0.5.0",
"proc-macro2",
@@ -8847,6 +8932,17 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "tar"
version = "0.4.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22692a6476a21fa75fdfc11d452fda482af402c008cdbaf3476414e122040973"
dependencies = [
"filetime",
"libc",
"xattr",
]
[[package]]
name = "target-features"
version = "0.1.6"
@@ -9439,6 +9535,12 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "unty"
version = "0.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae"
[[package]]
name = "ureq"
version = "2.12.1"
@@ -9518,6 +9620,12 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "virtue"
version = "0.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1"
[[package]]
name = "vsimd"
version = "0.8.0"
@@ -10212,6 +10320,16 @@ dependencies = [
"tap",
]
[[package]]
name = "xattr"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156"
dependencies = [
"libc",
"rustix",
]
[[package]]
name = "xet-client"
version = "1.5.2"
@@ -10371,6 +10489,12 @@ version = "0.8.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
[[package]]
name = "yada"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aed111bd9e48a802518765906cbdadf0b45afb72b9c81ab049a3b86252adffdd"
[[package]]
name = "yoke"
version = "0.8.1"

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=7.0.0-beta.9", default-features = false, "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.0.0-beta.9", default-features = false, "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.0.0-beta.9", default-features = false, "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

View File

@@ -437,29 +437,6 @@ Open a table in the database.
***
### renameTable()
```ts
abstract renameTable(
oldName,
newName,
namespacePath?): Promise<void>
```
#### Parameters
* **oldName**: `string`
* **newName**: `string`
* **namespacePath?**: `string`[]
#### Returns
`Promise`&lt;`void`&gt;
***
### tableNames()
#### tableNames(options)

View File

@@ -1,173 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / Scannable
# Class: Scannable
A data source that can be scanned as a stream of Arrow `RecordBatch`es.
`Scannable` wraps the schema + optional row count + rescannable flag and
a callback that yields batches one at a time. It is passed to consumers
(e.g. `Table.add`, `createTable`, `mergeInsert` — follow-up work) that
need to pull data without materializing the full dataset in JS memory.
Batches cross the JS↔Rust boundary as Arrow IPC Stream messages; a fresh
writer serializes each batch, and the Rust side decodes it with
`arrow_ipc::reader::StreamReader`. One batch is in flight at a time.
## Properties
### numRows
```ts
readonly numRows: null | number;
```
***
### rescannable
```ts
readonly rescannable: boolean;
```
***
### schema
```ts
readonly schema: Schema<any>;
```
## Methods
### fromFactory()
```ts
static fromFactory(
schema,
factory,
opts): Promise<Scannable>
```
Build a Scannable from an explicit schema and a factory that returns a
fresh batch iterator on each call.
The factory is invoked once per scan. Each iterator yields
`RecordBatch`es matching the declared schema. Use this when you need
direct control over the pull loop — for example, to wrap a streaming
source whose batches are produced lazily.
#### Parameters
* **schema**: `Schema`&lt;`any`&gt;
The Arrow schema of the produced batches.
* **factory**
Called at the start of each scan to produce a batch
iterator. Must be idempotent when `rescannable` is true.
* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}`
Optional hints. `rescannable` defaults to `true`; set to
`false` if calling `factory()` twice would not reproduce the same data.
#### Returns
`Promise`&lt;[`Scannable`](Scannable.md)&gt;
***
### fromIterable()
```ts
static fromIterable(
schema,
iter,
opts): Promise<Scannable>
```
Build a Scannable from an iterable of `RecordBatch`es. `rescannable`
defaults to `false`. Pass an explicit schema so the consumer can
validate before any batch is pulled.
`opts.rescannable: true` is honest for replayable iterables (Arrays,
Sets, or custom iterables whose `[Symbol.iterator]()` returns a fresh
iterator each call). It is rejected for one-shot iterables (generators,
async generators, or already-an-iterator inputs) because their
`[Symbol.iterator]()` returns the same exhausted object on the second
scan. For replayable sources outside this shape, use
`fromFactory(schema, () => createIter(), { rescannable: true })`.
Note: when `opts.rescannable` is `true`, the constructor calls
`[Symbol.iterator]()` once on the input to perform the structural check.
#### Parameters
* **schema**: `Schema`&lt;`any`&gt;
* **iter**: `Iterable`&lt;`RecordBatch`&lt;`any`&gt;&gt; \| `AsyncIterable`&lt;`RecordBatch`&lt;`any`&gt;&gt;
* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}`
#### Returns
`Promise`&lt;[`Scannable`](Scannable.md)&gt;
***
### fromRecordBatchReader()
```ts
static fromRecordBatchReader(reader, opts): Promise<Scannable>
```
Build a Scannable from an Arrow `RecordBatchReader`. A reader can only
be consumed once; `rescannable` defaults to `false`.
The reader must already be opened (via `.open()`) so its `.schema` is
populated. `RecordBatchReader.from(...)` returns an unopened reader.
`opts.rescannable: true` is rejected because `RecordBatchReader` is a
self-iterator (its `[Symbol.iterator]()` returns itself), and this
constructor does not call `reader.reset()` between scans, so a second
scan would always see an exhausted reader. For genuinely replayable
sources, use
`fromFactory(schema, () => openReader(), { rescannable: true })`,
which mints a fresh reader on each scan.
#### Parameters
* **reader**: `RecordBatchReader`&lt;`any`&gt;
* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}`
#### Returns
`Promise`&lt;[`Scannable`](Scannable.md)&gt;
***
### fromTable()
```ts
static fromTable(table, opts): Promise<Scannable>
```
Build a Scannable from an in-memory Arrow `Table`. Always rescannable;
the table's batches are replayed on each scan.
The table's row count is authoritative: `opts.numRows` must either be
omitted or equal to `table.numRows`. `opts.rescannable` of `false` is
rejected because in-memory Tables are always rescannable.
#### Parameters
* **table**: `Table`&lt;`any`&gt;
* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}`
#### Returns
`Promise`&lt;[`Scannable`](Scannable.md)&gt;

View File

@@ -32,7 +32,6 @@
- [PhraseQuery](classes/PhraseQuery.md)
- [Query](classes/Query.md)
- [QueryBase](classes/QueryBase.md)
- [Scannable](classes/Scannable.md)
- [Session](classes/Session.md)
- [StaticHeaderProvider](classes/StaticHeaderProvider.md)
- [Table](classes/Table.md)
@@ -87,7 +86,6 @@
- [RemovalStats](interfaces/RemovalStats.md)
- [RestNamespaceConfig](interfaces/RestNamespaceConfig.md)
- [RetryConfig](interfaces/RetryConfig.md)
- [ScannableOptions](interfaces/ScannableOptions.md)
- [ShuffleOptions](interfaces/ShuffleOptions.md)
- [SplitCalculatedOptions](interfaces/SplitCalculatedOptions.md)
- [SplitHashOptions](interfaces/SplitHashOptions.md)

View File

@@ -1,29 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / ScannableOptions
# Interface: ScannableOptions
## Properties
### numRows?
```ts
optional numRows: number;
```
Hint about the number of rows. Not validated against the stream.
***
### rescannable?
```ts
optional rescannable: boolean;
```
Whether the source can be scanned more than once. Defaults to `true` for
`fromTable` / `fromFactory` and `false` for `fromIterable` /
`fromRecordBatchReader`.

View File

@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>7.0.0-beta.9</lance-core.version>
<lance-core.version>7.0.0-beta.7</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -81,16 +81,6 @@ describe("given a connection", () => {
await db.createTable("test4", [{ id: 1 }, { id: 2 }]);
});
it("should expose renameTable and reject on OSS listing DB", async () => {
await db.createTable("old_name", [{ id: 1 }]);
await expect(db.renameTable("old_name", "new_name")).rejects.toThrow(
"rename_table is not supported in LanceDB OSS",
);
await expect(db.tableNames()).resolves.toEqual(["old_name"]);
});
it("should fail if creating table twice, unless overwrite is true", async () => {
let tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
await expect(tbl.countRows()).resolves.toBe(2);

View File

@@ -1,438 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
import {
Field,
Float16,
Int32,
type RecordBatch,
RecordBatchReader,
Schema,
tableToIPC,
} from "apache-arrow";
import { makeArrowTable, makeEmptyTable } from "../lancedb/arrow";
import { Scannable } from "../lancedb/scannable";
function makeTable() {
return makeArrowTable(
[
{ id: 1, name: "a" },
{ id: 2, name: "b" },
{ id: 3, name: "c" },
],
{ vectorColumns: {} },
);
}
async function makeReader(): Promise<RecordBatchReader> {
// `RecordBatchReader.from()` returns an unopened reader; `.schema` is only
// populated after `.open()`. Opening sync readers is synchronous.
const reader = RecordBatchReader.from(tableToIPC(makeTable()));
return reader.open() as RecordBatchReader;
}
describe("Scannable", () => {
describe("fromTable", () => {
test("reflects schema, numRows, and defaults rescannable=true", async () => {
const table = makeTable();
const scannable = await Scannable.fromTable(table);
expect(scannable.schema).toBe(table.schema);
expect(scannable.numRows).toBe(table.numRows);
expect(scannable.rescannable).toBe(true);
});
test("throws when opts.numRows does not match table.numRows", async () => {
await expect(
Scannable.fromTable(makeTable(), { numRows: 42 }),
).rejects.toThrow(/does not match table\.numRows/);
});
test("throws when opts.rescannable is false", async () => {
await expect(
Scannable.fromTable(makeTable(), { rescannable: false }),
).rejects.toThrow(/always rescannable/);
});
});
describe("fromRecordBatchReader", () => {
test("reflects schema and defaults numRows=null, rescannable=false", async () => {
const reader = await makeReader();
const scannable = await Scannable.fromRecordBatchReader(reader);
expect(scannable.schema).toBe(reader.schema);
expect(scannable.numRows).toBeNull();
expect(scannable.rescannable).toBe(false);
});
test("honors numRows override", async () => {
const scannable = await Scannable.fromRecordBatchReader(
await makeReader(),
{ numRows: 3 },
);
expect(scannable.numRows).toBe(3);
expect(scannable.rescannable).toBe(false);
});
test("rescannable: false explicit does not throw", async () => {
const reader = await makeReader();
const scannable = await Scannable.fromRecordBatchReader(reader, {
rescannable: false,
});
expect(scannable.rescannable).toBe(false);
});
test("throws when opts.rescannable is true", async () => {
const reader = await makeReader();
await expect(
Scannable.fromRecordBatchReader(reader, { rescannable: true }),
).rejects.toThrow(/does not accept rescannable/);
});
test("throws when opts.rescannable is true even alongside numRows", async () => {
const reader = await makeReader();
await expect(
Scannable.fromRecordBatchReader(reader, {
numRows: 3,
rescannable: true,
}),
).rejects.toThrow(/does not accept rescannable/);
});
});
describe("fromIterable", () => {
test("accepts a sync iterable of batches", async () => {
const table = makeTable();
const scannable = await Scannable.fromIterable(
table.schema,
table.batches,
);
expect(scannable.schema).toBe(table.schema);
expect(scannable.numRows).toBeNull();
expect(scannable.rescannable).toBe(false);
});
test("accepts an async iterable of batches", async () => {
const table = makeTable();
async function* generator(): AsyncGenerator<RecordBatch> {
for (const batch of table.batches) {
yield batch;
}
}
const scannable = await Scannable.fromIterable(table.schema, generator());
expect(scannable.schema).toBe(table.schema);
expect(scannable.rescannable).toBe(false);
});
describe("rescannable: true detection", () => {
// Replayable inputs: [Symbol.iterator]() / [Symbol.asyncIterator]()
// returns a fresh iterator each call. Must NOT throw.
test("Array passes (fresh ArrayIterator each call)", async () => {
const table = makeTable();
const scannable = await Scannable.fromIterable(
table.schema,
table.batches,
{ rescannable: true },
);
expect(scannable.rescannable).toBe(true);
});
test("Set passes (fresh SetIterator each call)", async () => {
const table = makeTable();
const set = new Set<RecordBatch>(table.batches);
const scannable = await Scannable.fromIterable(table.schema, set, {
rescannable: true,
});
expect(scannable.rescannable).toBe(true);
});
test("custom Iterable returning a fresh iterator passes", async () => {
const table = makeTable();
const replayable: Iterable<RecordBatch> = {
[Symbol.iterator]() {
return table.batches[Symbol.iterator]();
},
};
const scannable = await Scannable.fromIterable(
table.schema,
replayable,
{ rescannable: true },
);
expect(scannable.rescannable).toBe(true);
});
test("object with generator method passes (fresh generator each call)", async () => {
const table = makeTable();
const replayable: Iterable<RecordBatch> = {
*[Symbol.iterator]() {
for (const batch of table.batches) yield batch;
},
};
const scannable = await Scannable.fromIterable(
table.schema,
replayable,
{ rescannable: true },
);
expect(scannable.rescannable).toBe(true);
});
test("empty Array passes (replayable degenerate case)", async () => {
const schema = makeTable().schema;
const scannable = await Scannable.fromIterable(
schema,
[] as RecordBatch[],
{ rescannable: true },
);
expect(scannable.rescannable).toBe(true);
});
// One-shot inputs: [Symbol.iterator]() / [Symbol.asyncIterator]()
// returns the same object, or the input is already-an-iterator.
// Must throw with a /one-shot/ message.
test("sync generator throws", async () => {
const table = makeTable();
function* generator(): Generator<RecordBatch> {
for (const batch of table.batches) yield batch;
}
await expect(
Scannable.fromIterable(table.schema, generator(), {
rescannable: true,
}),
).rejects.toThrow(/one-shot/);
});
test("async generator throws", async () => {
const table = makeTable();
async function* generator(): AsyncGenerator<RecordBatch> {
for (const batch of table.batches) yield batch;
}
await expect(
Scannable.fromIterable(table.schema, generator(), {
rescannable: true,
}),
).rejects.toThrow(/one-shot/);
});
test("empty generator throws (one-shot degenerate case)", async () => {
const schema = makeTable().schema;
function* generator(): Generator<RecordBatch> {
// intentionally empty; yields nothing.
}
await expect(
Scannable.fromIterable(schema, generator(), { rescannable: true }),
).rejects.toThrow(/one-shot/);
});
test("custom self-iterator throws", async () => {
const table = makeTable();
const batches = table.batches;
let i = 0;
const oneShot: Iterable<RecordBatch> & Iterator<RecordBatch> = {
[Symbol.iterator]() {
return this;
},
next() {
if (i >= batches.length) {
return { done: true, value: undefined };
}
return { done: false, value: batches[i++] };
},
};
await expect(
Scannable.fromIterable(table.schema, oneShot, { rescannable: true }),
).rejects.toThrow(/one-shot/);
});
test("Array.values() (IterableIterator) throws", async () => {
const table = makeTable();
const iter = table.batches.values();
await expect(
Scannable.fromIterable(table.schema, iter, { rescannable: true }),
).rejects.toThrow(/one-shot/);
});
test("raw iterator (only `.next`) throws", async () => {
const table = makeTable();
const batches = table.batches;
let i = 0;
const rawIter = {
next(): IteratorResult<RecordBatch> {
if (i >= batches.length) {
return { done: true, value: undefined };
}
return { done: false, value: batches[i++] };
},
};
await expect(
Scannable.fromIterable(
table.schema,
rawIter as unknown as Iterable<RecordBatch>,
{ rescannable: true },
),
).rejects.toThrow(/one-shot/);
});
// Edge: null/undefined must not crash the detection helper. The
// null check belongs to `normalizeIterator` and only fires when a
// scan starts.
test("null input does not crash detection at construction", async () => {
const schema = makeTable().schema;
await expect(
Scannable.fromIterable(
schema,
null as unknown as Iterable<RecordBatch>,
{
rescannable: true,
},
),
).resolves.toBeDefined();
});
test("undefined input does not crash detection at construction", async () => {
const schema = makeTable().schema;
await expect(
Scannable.fromIterable(
schema,
undefined as unknown as Iterable<RecordBatch>,
{ rescannable: true },
),
).resolves.toBeDefined();
});
// Default (rescannable omitted) skips the check entirely, so even
// pathological inputs construct without throwing here.
test("rescannable omitted skips detection entirely (generator passes)", async () => {
const table = makeTable();
function* generator(): Generator<RecordBatch> {
for (const batch of table.batches) yield batch;
}
const scannable = await Scannable.fromIterable(
table.schema,
generator(),
);
expect(scannable.rescannable).toBe(false);
});
test("rescannable: false explicit skips detection entirely (generator passes)", async () => {
const table = makeTable();
function* generator(): Generator<RecordBatch> {
for (const batch of table.batches) yield batch;
}
const scannable = await Scannable.fromIterable(
table.schema,
generator(),
{ rescannable: false },
);
expect(scannable.rescannable).toBe(false);
});
});
});
describe("fromFactory", () => {
test("defaults rescannable=true and does not invoke the factory eagerly", async () => {
const table = makeTable();
const factory = jest.fn(() => table.batches);
const scannable = await Scannable.fromFactory(table.schema, factory);
expect(scannable.schema).toBe(table.schema);
expect(scannable.rescannable).toBe(true);
expect(factory).not.toHaveBeenCalled();
});
test("honors rescannable and numRows overrides", async () => {
const table = makeTable();
const scannable = await Scannable.fromFactory(
table.schema,
() => table.batches,
{ numRows: 7, rescannable: false },
);
expect(scannable.numRows).toBe(7);
expect(scannable.rescannable).toBe(false);
});
});
describe("validation", () => {
test("throws when numRows is negative", async () => {
await expect(
Scannable.fromFactory(makeTable().schema, () => [], { numRows: -1 }),
).rejects.toThrow(/non-negative/);
});
test("throws when numRows is not an integer", async () => {
await expect(
Scannable.fromFactory(makeTable().schema, () => [], { numRows: 3.5 }),
).rejects.toThrow(/integer/);
});
});
describe("native handle", () => {
test("exposes a native handle via inner", async () => {
const scannable = await Scannable.fromTable(makeTable());
expect(scannable.inner).toBeDefined();
expect(typeof scannable.inner).toBe("object");
expect(scannable.inner).not.toBeNull();
});
});
// Schema-variety construction tests. Each asserts that construction
// succeeds against a richer Arrow schema, which transitively exercises
// schema serialization and the Rust-side `ipc_file_to_schema` for types
// beyond flat primitives.
describe("schema variety", () => {
test("accepts an empty table", async () => {
const schema = new Schema([new Field("id", new Int32(), true)]);
const table = makeEmptyTable(schema);
const scannable = await Scannable.fromTable(table);
expect(scannable.numRows).toBe(0);
expect(scannable.schema).toBe(table.schema);
});
test("accepts nested struct and list columns", async () => {
const table = makeArrowTable(
[
{ id: 1, point: { x: 0, y: 0 }, tags: ["a", "b"] },
{ id: 2, point: { x: 1, y: 2 }, tags: ["c"] },
],
{ vectorColumns: {} },
);
const scannable = await Scannable.fromTable(table);
expect(scannable.schema).toBe(table.schema);
expect(scannable.numRows).toBe(2);
});
test("accepts a FixedSizeList (vector) column", async () => {
const table = makeArrowTable(
[
{ id: 1, vec: [1, 2, 3] },
{ id: 2, vec: [4, 5, 6] },
],
{ vectorColumns: { vec: { type: new Float16() } } },
);
const scannable = await Scannable.fromTable(table);
expect(scannable.schema).toBe(table.schema);
expect(scannable.numRows).toBe(2);
});
test("accepts a table with many columns", async () => {
const row: Record<string, number> = {};
for (let i = 0; i < 50; i++) row[`c${i}`] = i;
const table = makeArrowTable([row, row], { vectorColumns: {} });
const scannable = await Scannable.fromTable(table);
expect(scannable.schema.fields.length).toBe(50);
expect(scannable.numRows).toBe(2);
});
});
});

View File

@@ -115,6 +115,12 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
await expect(table.countRows()).resolves.toBe(1);
});
it("should accept skipAutoCleanup on add()", async () => {
await table.add([{ id: 1 }], { skipAutoCleanup: true });
await table.add([{ id: 2 }], { skipAutoCleanup: true });
await expect(table.countRows()).resolves.toBe(2);
});
it("should let me close the table", async () => {
expect(table.isOpen()).toBe(true);
table.close();

View File

@@ -1291,18 +1291,6 @@ export async function fromRecordBatchToBuffer(
return Buffer.from(await writer.toUint8Array());
}
/**
* Create a buffer containing a single record batch using the Arrow IPC Stream
* serialization. Each call produces a self-contained Stream message (schema +
* batch + EOS) suitable for incremental decode by `arrow_ipc::reader::StreamReader`.
*/
export async function fromRecordBatchToStreamBuffer(
batch: RecordBatch,
): Promise<Buffer> {
const writer = RecordBatchStreamWriter.writeAll([batch]);
return Buffer.from(await writer.toUint8Array());
}
/**
* Serialize an Arrow Table into a buffer using the Arrow IPC Stream serialization
*

View File

@@ -296,12 +296,6 @@ export abstract class Connection {
*/
abstract dropTable(name: string, namespacePath?: string[]): Promise<void>;
abstract renameTable(
oldName: string,
newName: string,
namespacePath?: string[],
): Promise<void>;
/**
* Drop all tables in the database.
* @param {string[]} namespacePath The namespace path to drop tables from (defaults to root namespace).
@@ -615,14 +609,6 @@ export class LocalConnection extends Connection {
return this.inner.dropTable(name, namespacePath ?? []);
}
async renameTable(
oldName: string,
newName: string,
namespacePath?: string[],
): Promise<void> {
return this.inner.renameTable(oldName, newName, namespacePath ?? []);
}
async dropAllTables(namespacePath?: string[]): Promise<void> {
return this.inner.dropAllTables(namespacePath ?? []);
}

View File

@@ -126,7 +126,6 @@ export { MergeInsertBuilder, WriteExecutionOptions } from "./merge";
export * as embedding from "./embedding";
export { permutationBuilder, PermutationBuilder } from "./permutation";
export { Scannable, ScannableOptions } from "./scannable";
export * as rerankers from "./rerankers";
export {
SchemaLike,

View File

@@ -87,6 +87,23 @@ export class MergeInsertBuilder {
this.#schema,
);
}
/**
* Skip the automatic cleanup of old dataset versions that would otherwise
* run as part of this merge insert's commit. Forwards to
* `MergeInsertBuilder::skip_auto_cleanup` in lance-core.
*
* Useful for high-frequency writers that prefer to manage version cleanup
* themselves, or writers without delete permissions on the underlying storage.
*
* @param skip - If true, the auto-cleanup step is skipped at commit time.
*/
skipAutoCleanup(skip: boolean): MergeInsertBuilder {
return new MergeInsertBuilder(
this.#native.skipAutoCleanup(skip),
this.#schema,
);
}
/**
* Executes the merge insert operation
*

View File

@@ -1,274 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
import {
Table as ArrowTable,
RecordBatch,
RecordBatchReader,
Schema,
} from "apache-arrow";
import {
fromRecordBatchToStreamBuffer,
fromTableToBuffer,
makeEmptyTable,
} from "./arrow";
import { NapiScannable } from "./native.js";
export interface ScannableOptions {
/** Hint about the number of rows. Not validated against the stream. */
numRows?: number;
/**
* Whether the source can be scanned more than once. Defaults to `true` for
* `fromTable` / `fromFactory` and `false` for `fromIterable` /
* `fromRecordBatchReader`.
*/
rescannable?: boolean;
}
/**
* A data source that can be scanned as a stream of Arrow `RecordBatch`es.
*
* `Scannable` wraps the schema + optional row count + rescannable flag and
* a callback that yields batches one at a time. It is passed to consumers
* (e.g. `Table.add`, `createTable`, `mergeInsert` — follow-up work) that
* need to pull data without materializing the full dataset in JS memory.
*
* Batches cross the JS↔Rust boundary as Arrow IPC Stream messages; a fresh
* writer serializes each batch, and the Rust side decodes it with
* `arrow_ipc::reader::StreamReader`. One batch is in flight at a time.
*/
export class Scannable {
readonly schema: Schema;
readonly numRows: number | null;
readonly rescannable: boolean;
/** @hidden */
private readonly native: NapiScannable;
private constructor(
native: NapiScannable,
schema: Schema,
numRows: number | null,
rescannable: boolean,
) {
this.native = native;
this.schema = schema;
this.numRows = numRows;
this.rescannable = rescannable;
}
/** @hidden Access the native handle for passing through to Rust consumers. */
get inner(): NapiScannable {
return this.native;
}
/**
* Build a Scannable from an explicit schema and a factory that returns a
* fresh batch iterator on each call.
*
* The factory is invoked once per scan. Each iterator yields
* `RecordBatch`es matching the declared schema. Use this when you need
* direct control over the pull loop — for example, to wrap a streaming
* source whose batches are produced lazily.
*
* @param schema - The Arrow schema of the produced batches.
* @param factory - Called at the start of each scan to produce a batch
* iterator. Must be idempotent when `rescannable` is true.
* @param opts - Optional hints. `rescannable` defaults to `true`; set to
* `false` if calling `factory()` twice would not reproduce the same data.
*/
static async fromFactory(
schema: Schema,
factory: () =>
| AsyncIterable<RecordBatch>
| Iterable<RecordBatch>
| AsyncIterator<RecordBatch>
| Iterator<RecordBatch>,
opts: ScannableOptions = {},
): Promise<Scannable> {
const numRows = opts.numRows ?? null;
if (numRows != null && !Number.isInteger(numRows)) {
throw new TypeError("numRows must be an integer");
}
const rescannable = opts.rescannable ?? true;
let iter: AsyncIterator<RecordBatch> | Iterator<RecordBatch> | null = null;
const getNextBatch = async (isStart: boolean): Promise<Buffer | null> => {
// `isStart` is true on the first pull of every new scan_as_stream.
// Drop any cached iterator so factory() is re-invoked for the next scan
if (isStart) {
iter = null;
}
if (iter === null) {
iter = normalizeIterator(factory());
}
const result = await iter.next();
if (result.done) {
iter = null;
return null;
}
return fromRecordBatchToStreamBuffer(result.value);
};
const schemaBuf = await fromTableToBuffer(makeEmptyTable(schema));
const native = new NapiScannable(
schemaBuf,
numRows,
rescannable,
getNextBatch,
);
return new Scannable(native, schema, numRows, rescannable);
}
/**
* Build a Scannable from an in-memory Arrow `Table`. Always rescannable;
* the table's batches are replayed on each scan.
*
* The table's row count is authoritative: `opts.numRows` must either be
* omitted or equal to `table.numRows`. `opts.rescannable` of `false` is
* rejected because in-memory Tables are always rescannable.
*/
static async fromTable(
table: ArrowTable,
opts: ScannableOptions = {},
): Promise<Scannable> {
if (opts.numRows != null && opts.numRows !== table.numRows) {
throw new TypeError(
`opts.numRows (${opts.numRows}) does not match table.numRows (${table.numRows}). ` +
`The table's row count is authoritative; omit numRows or pass the matching value.`,
);
}
if (opts.rescannable === false) {
throw new TypeError(
`fromTable does not accept rescannable: false. ` +
`In-memory Arrow Tables are always rescannable; omit the option or pass true.`,
);
}
return Scannable.fromFactory(table.schema, () => table.batches, {
numRows: table.numRows,
rescannable: true,
});
}
/**
* Build a Scannable from an iterable of `RecordBatch`es. `rescannable`
* defaults to `false`. Pass an explicit schema so the consumer can
* validate before any batch is pulled.
*
* `opts.rescannable: true` is honest for replayable iterables (Arrays,
* Sets, or custom iterables whose `[Symbol.iterator]()` returns a fresh
* iterator each call). It is rejected for one-shot iterables (generators,
* async generators, or already-an-iterator inputs) because their
* `[Symbol.iterator]()` returns the same exhausted object on the second
* scan. For replayable sources outside this shape, use
* `fromFactory(schema, () => createIter(), { rescannable: true })`.
*
* Note: when `opts.rescannable` is `true`, the constructor calls
* `[Symbol.iterator]()` once on the input to perform the structural check.
*/
static async fromIterable(
schema: Schema,
iter: AsyncIterable<RecordBatch> | Iterable<RecordBatch>,
opts: ScannableOptions = {},
): Promise<Scannable> {
if (opts.rescannable === true && isOneShotIterable(iter)) {
throw new TypeError(
`fromIterable: rescannable: true is not honest for one-shot iterables ` +
`(generators, async generators, or iterators where [Symbol.iterator]() ` +
`returns the same object). The source would be exhausted after the first scan. ` +
`Use fromFactory(schema, () => createIter(), { rescannable: true }) for sources ` +
`where each call mints a fresh iterator.`,
);
}
return Scannable.fromFactory(schema, () => iter, {
numRows: opts.numRows,
rescannable: opts.rescannable ?? false,
});
}
/**
* Build a Scannable from an Arrow `RecordBatchReader`. A reader can only
* be consumed once; `rescannable` defaults to `false`.
*
* The reader must already be opened (via `.open()`) so its `.schema` is
* populated. `RecordBatchReader.from(...)` returns an unopened reader.
*
* `opts.rescannable: true` is rejected because `RecordBatchReader` is a
* self-iterator (its `[Symbol.iterator]()` returns itself), and this
* constructor does not call `reader.reset()` between scans, so a second
* scan would always see an exhausted reader. For genuinely replayable
* sources, use
* `fromFactory(schema, () => openReader(), { rescannable: true })`,
* which mints a fresh reader on each scan.
*/
static async fromRecordBatchReader(
reader: RecordBatchReader,
opts: ScannableOptions = {},
): Promise<Scannable> {
if (opts.rescannable === true) {
throw new TypeError(
`fromRecordBatchReader does not accept rescannable: true. ` +
`RecordBatchReader is a self-iterator (its [Symbol.iterator]() ` +
`returns itself) and would be exhausted after the first scan. ` +
`Use fromFactory(schema, () => openReader(), { rescannable: true }) ` +
`for sources where each call mints a fresh reader.`,
);
}
return Scannable.fromFactory(reader.schema, () => reader, {
numRows: opts.numRows,
rescannable: false,
});
}
}
function normalizeIterator<T>(
source: AsyncIterable<T> | Iterable<T> | AsyncIterator<T> | Iterator<T>,
): AsyncIterator<T> | Iterator<T> {
if (source == null) {
throw new TypeError("Scannable factory returned null/undefined");
}
if (
typeof (source as AsyncIterable<T>)[Symbol.asyncIterator] === "function"
) {
return (source as AsyncIterable<T>)[Symbol.asyncIterator]();
}
if (typeof (source as Iterable<T>)[Symbol.iterator] === "function") {
return (source as Iterable<T>)[Symbol.iterator]();
}
// Already an iterator (has `.next`).
if (typeof (source as Iterator<T>).next === "function") {
return source as Iterator<T>;
}
throw new TypeError("Scannable factory returned a non-iterable value");
}
// A "self-iterator" returns the same object from `[Symbol.iterator]()` /
// `[Symbol.asyncIterator]()`. Generators behave this way, so they exhaust
// after one pass. Replayable iterables (Array, Set, custom) return a fresh
// iterator each call. Detection mirrors `normalizeIterator`'s ordering so
// classification matches scan-time behavior.
function isOneShotIterable(
source: AsyncIterable<unknown> | Iterable<unknown>,
): boolean {
// null/undefined are not one-shot in any meaningful sense; let
// `normalizeIterator` raise the actual error at scan time.
if (source == null) return false;
const ref = source as unknown;
if (
typeof (source as AsyncIterable<unknown>)[Symbol.asyncIterator] ===
"function"
) {
const it = (source as AsyncIterable<unknown>)[
Symbol.asyncIterator
]() as unknown;
return it === ref;
}
if (typeof (source as Iterable<unknown>)[Symbol.iterator] === "function") {
const it = (source as Iterable<unknown>)[Symbol.iterator]() as unknown;
return it === ref;
}
// Already-an-iterator (has `.next` but no `Symbol.iterator`) is by
// definition one-shot.
if (typeof (source as { next?: unknown }).next === "function") return true;
return false;
}

View File

@@ -56,6 +56,18 @@ export interface AddDataOptions {
* If "overwrite" then the new data will replace the existing data in the table.
*/
mode: "append" | "overwrite";
/**
* If true, skip the automatic cleanup of old dataset versions that would
* otherwise run as part of this write's commit. Forwards to
* `WriteParams.skip_auto_cleanup` in lance-core.
*
* Useful for high-frequency writers that prefer to manage version cleanup
* themselves (for example, via a separate periodic optimize job), or for
* writers that don't have delete permissions on the underlying storage.
*
* Defaults to false.
*/
skipAutoCleanup?: boolean;
}
export interface UpdateOptions {
@@ -636,7 +648,7 @@ export class LocalTable extends Table {
const schema = await this.schema();
const buffer = await fromDataToBuffer(data, undefined, schema);
return await this.inner.add(buffer, mode);
return await this.inner.add(buffer, mode, options?.skipAutoCleanup);
}
async update(

View File

@@ -328,20 +328,6 @@ impl Connection {
.default_error()
}
#[napi(catch_unwind)]
pub async fn rename_table(
&self,
old_name: String,
new_name: String,
namespace_path: Option<Vec<String>>,
) -> napi::Result<()> {
let ns = namespace_path.unwrap_or_default();
self.get_inner()?
.rename_table(&old_name, &new_name, &ns, &ns)
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn drop_all_tables(&self, namespace_path: Option<Vec<String>>) -> napi::Result<()> {
let ns = namespace_path.unwrap_or_default();

View File

@@ -16,7 +16,6 @@ pub mod permutation;
mod query;
pub mod remote;
mod rerankers;
mod scannable;
mod session;
mod table;
mod util;

View File

@@ -50,6 +50,13 @@ impl NativeMergeInsertBuilder {
this
}
#[napi]
pub fn skip_auto_cleanup(&self, skip: bool) -> Self {
let mut this = self.clone();
this.inner.skip_auto_cleanup(skip);
this
}
#[napi(catch_unwind)]
pub async fn execute(&self, buf: Buffer) -> napi::Result<MergeResult> {
let data = ipc_file_to_batches(buf.to_vec())

View File

@@ -1,253 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! NodeJS binding for the [`lancedb::data::scannable::Scannable`] trait.
//!
//! The JS side supplies a `getNextBatch(isStart)` callback that returns the
//! next Arrow `RecordBatch` encoded as a self-contained Arrow IPC Stream
//! message (schema message + record batch message + EOS marker) wrapped in a
//! `Buffer`, or `null` when the stream is exhausted. The Rust side parses
//! each buffer with `arrow_ipc::reader::StreamReader`, validates every
//! standalone batch stream against the declared schema, and yields decoded
//! `RecordBatch`es as a [`SendableRecordBatchStream`].
//!
//! `isStart` is `true` on the first `getNextBatch` call of each new
//! `scan_as_stream` and `false` thereafter. JS uses it to drop any cached
//! iterator and re-invoke its factory at scan boundaries, so retries
//! triggered by mid-stream failures restart at batch 0.
use std::io::Cursor;
use std::sync::Arc;
use arrow_array::RecordBatch;
use arrow_ipc::reader::StreamReader;
use arrow_schema::SchemaRef;
use futures::stream::once;
use lancedb::arrow::{SendableRecordBatchStream, SimpleRecordBatchStream};
use lancedb::data::scannable::Scannable as LanceScannable;
use lancedb::ipc::ipc_file_to_schema;
use lancedb::{Error, Result as LanceResult};
use napi::bindgen_prelude::*;
use napi::threadsafe_function::ThreadsafeFunction;
use napi_derive::napi;
/// Threadsafe handle to the JS `getNextBatch` callback. The callback takes a
/// single boolean `isStart` (`true` on the first call of each new scan) and
/// returns a Promise that resolves to a `Buffer` containing one IPC Stream
/// message, or `null` at end-of-stream.
type GetNextBatchFn = ThreadsafeFunction<bool, Promise<Option<Buffer>>, bool, Status, false>;
/// A Rust-side view of a JS-constructed `Scannable`.
///
/// Held in JS as the return value of the `Scannable` class constructor. When
/// passed to a consumer that accepts `impl lancedb::data::scannable::Scannable`,
/// the consumer invokes `scan_as_stream()` to pull batches through the JS
/// callback.
#[napi]
pub struct NapiScannable {
schema: SchemaRef,
num_rows: Option<usize>,
rescannable: bool,
// `ThreadsafeFunction` is not `Clone`; wrap in `Arc` so the stream
// returned by `scan_as_stream` can own a handle independent of `self`.
get_next_batch: Arc<GetNextBatchFn>,
// Tracks whether a scan has already started; used to enforce one-shot
// semantics on non-rescannable sources.
scanned: bool,
}
#[napi]
impl NapiScannable {
/// Construct a new `NapiScannable`.
///
/// - `schema_buf` — Arrow IPC File buffer carrying only the schema (no batches).
/// - `num_rows` — optional row count hint; not validated against the stream.
/// - `rescannable` — whether `get_next_batch` may be re-driven after the
/// scan completes.
/// - `get_next_batch` -- JS callback that yields the next batch as an Arrow
/// IPC Stream message wrapped in a `Buffer`, or `null` at EOF. The
/// `isStart` argument is `true` on the first call of each new scan;
/// JS uses it to discard any cached iterator before pulling.
#[napi(constructor)]
pub fn new(
schema_buf: Buffer,
num_rows: Option<i64>,
rescannable: bool,
get_next_batch: Function<bool, Promise<Option<Buffer>>>,
) -> napi::Result<Self> {
let schema = ipc_file_to_schema(schema_buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Invalid schema buffer: {}", e)))?;
let num_rows = num_rows
.map(|n| {
usize::try_from(n)
.map_err(|_| napi::Error::from_reason("num_rows must be non-negative"))
})
.transpose()?;
let get_next_batch = Arc::new(get_next_batch.build_threadsafe_function().build()?);
Ok(Self {
schema,
num_rows,
rescannable,
get_next_batch,
scanned: false,
})
}
}
impl std::fmt::Debug for NapiScannable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NapiScannable")
.field("schema", &self.schema)
.field("num_rows", &self.num_rows)
.field("rescannable", &self.rescannable)
.finish()
}
}
impl LanceScannable for NapiScannable {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn scan_as_stream(&mut self) -> SendableRecordBatchStream {
let schema = self.schema.clone();
// One-shot enforcement for non-rescannable sources: return a stream
// whose first item is an error.
if self.scanned && !self.rescannable {
let err_stream = once(async {
Err(Error::InvalidInput {
message: "Scannable has already been consumed (non-rescannable source)"
.to_string(),
})
});
return Box::pin(SimpleRecordBatchStream::new(err_stream, schema));
}
self.scanned = true;
let tsfn = Arc::clone(&self.get_next_batch);
let declared_schema = schema.clone();
// State threaded through the unfold. `is_first_pull` starts true so
// the first call into JS signals a new-scan boundary; JS uses it to
// reset any cached iterator before factory()-ing a fresh one.
let initial = State {
tsfn,
batch_index: 0,
declared_schema,
errored: false,
is_first_pull: true,
};
let stream = futures::stream::unfold(initial, |mut state| async move {
if state.errored {
return None;
}
// Pull the next IPC Stream buffer from JS. `is_first_pull` is
// consumed here and cleared so subsequent pulls continue the
// same scan rather than restarting it.
let is_start = state.is_first_pull;
state.is_first_pull = false;
let buf = match pull_next(&state.tsfn, is_start).await {
Ok(Some(buf)) => buf,
Ok(None) => return None,
Err(e) => {
state.errored = true;
return Some((Err(e), state));
}
};
match decode_one_batch(buf.as_ref(), &state.declared_schema) {
Ok(batch) => {
state.batch_index += 1;
Some((Ok(batch), state))
}
Err(e) => {
let tagged = Error::Runtime {
message: format!(
"[scannable/rust-bridge] failure at batch index {}: {}",
state.batch_index, e
),
};
state.errored = true;
Some((Err(tagged), state))
}
}
});
Box::pin(SimpleRecordBatchStream::new(stream, schema))
}
fn num_rows(&self) -> Option<usize> {
self.num_rows
}
fn rescannable(&self) -> bool {
self.rescannable
}
}
struct State {
tsfn: Arc<GetNextBatchFn>,
batch_index: usize,
declared_schema: SchemaRef,
errored: bool,
/// True for the very first pull of a new scan. Forwarded to JS so the
/// callback can drop any cached iterator and call its factory fresh,
/// which makes rescannable sources restart at batch 0 even when the
/// previous scan ended mid-stream.
is_first_pull: bool,
}
/// Invoke the JS callback and await its Promise. `is_start` is forwarded to
/// the JS side as the `isStart` argument so it can reset its iterator at the
/// scan boundary. Errors on the JS side surface here as rejected promises
/// and are tunneled back as `lancedb::Error::Runtime`.
async fn pull_next(tsfn: &GetNextBatchFn, is_start: bool) -> LanceResult<Option<Buffer>> {
let promise = tsfn
.call_async(is_start)
.await
.map_err(|e| Error::Runtime {
message: format!(
"[scannable/js-factory] napi error status={}, reason={}",
e.status, e.reason
),
})?;
promise.await.map_err(|e| Error::Runtime {
message: format!(
"[scannable/js-iterator] napi error status={}, reason={}",
e.status, e.reason
),
})
}
/// Decode one IPC Stream buffer (schema + batch + EOS) into a `RecordBatch`.
/// Each buffer is a standalone IPC stream, so every decoded stream schema must
/// match the one declared at construction.
fn decode_one_batch(buf: &[u8], declared: &SchemaRef) -> LanceResult<RecordBatch> {
let reader = StreamReader::try_new(Cursor::new(buf), None).map_err(|e| Error::Runtime {
message: format!("failed to open IPC stream reader: {}", e),
})?;
let actual = reader.schema();
if actual.as_ref() != declared.as_ref() {
return Err(Error::InvalidInput {
message: format!(
"declared schema does not match stream schema: declared={:?} actual={:?}",
declared, actual
),
});
}
let mut iter = reader;
let batch = iter
.next()
.ok_or_else(|| Error::Runtime {
message: "IPC stream contained schema but no record batch".to_string(),
})?
.map_err(|e| Error::Runtime {
message: format!("failed to decode record batch: {}", e),
})?;
Ok(batch)
}

View File

@@ -6,7 +6,7 @@ use std::collections::HashMap;
use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema};
use lancedb::table::{
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform,
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
OptimizeAction, OptimizeOptions, Table as LanceDbTable, WriteOptions,
};
use napi::bindgen_prelude::*;
use napi_derive::napi;
@@ -68,7 +68,12 @@ impl Table {
}
#[napi(catch_unwind)]
pub async fn add(&self, buf: Buffer, mode: String) -> napi::Result<AddResult> {
pub async fn add(
&self,
buf: Buffer,
mode: String,
skip_auto_cleanup: Option<bool>,
) -> napi::Result<AddResult> {
let batches = ipc_file_to_batches(buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
let batches = batches
@@ -92,6 +97,13 @@ impl Table {
return Err(napi::Error::from_reason(format!("Invalid mode: {}", mode)));
};
if skip_auto_cleanup.unwrap_or(false) {
op = op.write_options(WriteOptions {
skip_auto_cleanup: true,
..Default::default()
});
}
let res = op.execute().await.default_error()?;
Ok(res.into())
}

View File

@@ -19,7 +19,6 @@ arrow = { version = "58.0.0", features = ["pyarrow"] }
async-trait = "0.1"
bytes = "1"
lancedb = { path = "../rust/lancedb", default-features = false }
datafusion-common.workspace = true
lance-core.workspace = true
lance-namespace.workspace = true
lance-namespace-impls.workspace = true

View File

@@ -51,7 +51,7 @@ class PyExpr:
def to_sql(self) -> str: ...
def expr_col(name: str) -> PyExpr: ...
def expr_lit(value: Union[bool, int, float, str, bytes]) -> PyExpr: ...
def expr_lit(value: Union[bool, int, float, str]) -> PyExpr: ...
def expr_func(name: str, args: List[PyExpr]) -> PyExpr: ...
class Session:

View File

@@ -63,7 +63,7 @@ def _coerce(value: "ExprLike") -> "Expr":
# Type alias used in annotations.
ExprLike = Union["Expr", bool, int, float, str, bytes]
ExprLike = Union["Expr", bool, int, float, str]
class Expr:
@@ -261,13 +261,13 @@ def col(name: str) -> Expr:
return Expr(expr_col(name))
def lit(value: Union[bool, int, float, str, bytes]) -> Expr:
def lit(value: Union[bool, int, float, str]) -> Expr:
"""Create a literal (constant) value expression.
Parameters
----------
value:
A Python ``bool``, ``int``, ``float``, ``str``, or ``bytes``.
A Python ``bool``, ``int``, ``float``, or ``str``.
Examples
--------

View File

@@ -6,44 +6,22 @@
from typing import Optional
_CREATE_NAMESPACE_MODES = frozenset({"create", "exist_ok", "overwrite"})
_DROP_NAMESPACE_MODES = frozenset({"SKIP", "FAIL"})
_DROP_NAMESPACE_BEHAVIORS = frozenset({"RESTRICT", "CASCADE"})
def _normalize_create_namespace_mode(mode: Optional[str]) -> Optional[str]:
"""Normalize create namespace mode to lowercase (API expects lowercase)."""
if mode is None:
return None
normalized = mode.lower()
if normalized not in _CREATE_NAMESPACE_MODES:
raise ValueError(
f"Invalid create namespace mode {mode!r}: "
f"expected one of 'create', 'exist_ok', 'overwrite'"
)
return normalized
return mode.lower()
def _normalize_drop_namespace_mode(mode: Optional[str]) -> Optional[str]:
"""Normalize drop namespace mode to uppercase (API expects uppercase)."""
if mode is None:
return None
normalized = mode.upper()
if normalized not in _DROP_NAMESPACE_MODES:
raise ValueError(
f"Invalid drop namespace mode {mode!r}: expected one of 'skip', 'fail'"
)
return normalized
return mode.upper()
def _normalize_drop_namespace_behavior(behavior: Optional[str]) -> Optional[str]:
"""Normalize drop namespace behavior to uppercase (API expects uppercase)."""
if behavior is None:
return None
normalized = behavior.upper()
if normalized not in _DROP_NAMESPACE_BEHAVIORS:
raise ValueError(
f"Invalid drop namespace behavior {behavior!r}: "
f"expected one of 'restrict', 'cascade'"
)
return normalized
return behavior.upper()

View File

@@ -1,3 +1,4 @@
segmenter:
mode: "normal"
dictionary: "./python/tests/models/lindera/ipadic/main"
dictionary:
path: "./python/tests/models/lindera/ipadic/main"

View File

@@ -914,29 +914,6 @@ def test_local_namespace_operations(tmp_path):
assert db.list_namespaces().namespaces == []
def test_create_namespace_invalid_mode_raises(tmp_path):
"""Unrecognized create namespace modes raise a clear error."""
db = lancedb.connect(tmp_path)
with pytest.raises(ValueError, match="Invalid create namespace mode"):
db.create_namespace(["child"], mode="frobnicate")
def test_drop_namespace_invalid_mode_raises(tmp_path):
"""Unrecognized drop namespace modes raise a clear error."""
db = lancedb.connect(tmp_path)
db.create_namespace(["child"])
with pytest.raises(ValueError, match="Invalid drop namespace mode"):
db.drop_namespace(["child"], mode="frobnicate")
def test_drop_namespace_invalid_behavior_raises(tmp_path):
"""Unrecognized drop namespace behaviors raise a clear error."""
db = lancedb.connect(tmp_path)
db.create_namespace(["child"])
with pytest.raises(ValueError, match="Invalid drop namespace behavior"):
db.drop_namespace(["child"], behavior="frobnicate")
def test_clone_table_latest_version(tmp_path):
"""Test cloning a table with the latest version (default behavior)"""
import os

View File

@@ -116,7 +116,8 @@ def lindera_ipadic(language_model_home):
config_path.write_text(
"segmenter:\n"
' mode: "normal"\n'
f' dictionary: "{extracted_model.resolve().as_posix()}"\n',
" dictionary:\n"
f' path: "{extracted_model.resolve().as_posix()}"\n',
encoding="utf-8",
)

View File

@@ -395,17 +395,12 @@ impl Connection {
future_into_py(py, async move {
use lance_namespace::models::CreateNamespaceRequest;
// Mode is now a string field
let mode_str = mode
.map(|m| match m.to_lowercase().as_str() {
"create" => Ok("Create".to_string()),
"exist_ok" => Ok("ExistOk".to_string()),
"overwrite" => Ok("Overwrite".to_string()),
_ => Err(PyValueError::new_err(format!(
"Invalid mode {:?}: expected one of 'create', 'exist_ok', 'overwrite'",
m
))),
})
.transpose()?;
let mode_str = mode.and_then(|m| match m.to_lowercase().as_str() {
"create" => Some("Create".to_string()),
"exist_ok" => Some("ExistOk".to_string()),
"overwrite" => Some("Overwrite".to_string()),
_ => None,
});
let request = CreateNamespaceRequest {
id: Some(namespace_path),
mode: mode_str,
@@ -433,26 +428,16 @@ impl Connection {
future_into_py(py, async move {
use lance_namespace::models::DropNamespaceRequest;
// Mode and Behavior are now string fields
let mode_str = mode
.map(|m| match m.to_uppercase().as_str() {
"SKIP" => Ok("Skip".to_string()),
"FAIL" => Ok("Fail".to_string()),
_ => Err(PyValueError::new_err(format!(
"Invalid mode {:?}: expected one of 'skip', 'fail'",
m
))),
})
.transpose()?;
let behavior_str = behavior
.map(|b| match b.to_uppercase().as_str() {
"RESTRICT" => Ok("Restrict".to_string()),
"CASCADE" => Ok("Cascade".to_string()),
_ => Err(PyValueError::new_err(format!(
"Invalid behavior {:?}: expected one of 'restrict', 'cascade'",
b
))),
})
.transpose()?;
let mode_str = mode.and_then(|m| match m.to_uppercase().as_str() {
"SKIP" => Some("Skip".to_string()),
"FAIL" => Some("Fail".to_string()),
_ => None,
});
let behavior_str = behavior.and_then(|b| match b.to_uppercase().as_str() {
"RESTRICT" => Some("Restrict".to_string()),
"CASCADE" => Some("Cascade".to_string()),
_ => None,
});
let request = DropNamespaceRequest {
id: Some(namespace_path),
mode: mode_str,

View File

@@ -8,9 +8,7 @@
//! DataFusion [`Expr`] nodes, bypassing SQL string parsing.
use arrow::{datatypes::DataType, pyarrow::PyArrowType};
use datafusion_common::ScalarValue;
use lancedb::expr::{DfExpr, col as ldb_col, contains, expr_cast, lit as df_lit, lower, upper};
use pyo3::types::PyBytes;
use pyo3::{Bound, PyAny, PyResult, exceptions::PyValueError, prelude::*, pyfunction};
/// A type-safe DataFusion expression.
@@ -143,7 +141,7 @@ pub fn expr_col(name: &str) -> PyExpr {
/// Create a literal value expression.
///
/// Supported Python types: `bool`, `int`, `float`, `str`, `bytes`.
/// Supported Python types: `bool`, `int`, `float`, `str`.
#[pyfunction]
pub fn expr_lit(value: Bound<'_, PyAny>) -> PyResult<PyExpr> {
// bool must be checked before int because bool is a subclass of int in Python
@@ -159,12 +157,8 @@ pub fn expr_lit(value: Bound<'_, PyAny>) -> PyResult<PyExpr> {
if let Ok(s) = value.extract::<String>() {
return Ok(PyExpr(df_lit(s)));
}
if value.is_instance_of::<PyBytes>() {
let bytes = value.extract::<Vec<u8>>()?;
return Ok(PyExpr(df_lit(ScalarValue::Binary(Some(bytes)))));
}
Err(PyValueError::new_err(format!(
"unsupported literal type: {}. Supported: bool, int, float, str, bytes",
"unsupported literal type: {}. Supported: bool, int, float, str",
value.get_type().name()?
)))
}

View File

@@ -33,14 +33,6 @@ class TestExprConstruction:
e = lit(True)
assert isinstance(e, Expr)
def test_lit_bytes(self):
e = lit(b"\xde\xad\xbe\xef")
assert isinstance(e, Expr)
def test_lit_bytes_empty(self):
e = lit(b"")
assert isinstance(e, Expr)
def test_lit_unsupported_type_raises(self):
with pytest.raises(Exception):
lit([1, 2, 3])
@@ -143,43 +135,6 @@ class TestExprOperators:
assert e.to_sql() == "(name = 'alice')"
class TestExprBytesLiteral:
def test_bytes_to_sql(self):
e = lit(b"\xde\xad\xbe\xef")
assert e.to_sql() == "X'DEADBEEF'"
def test_empty_bytes_to_sql(self):
e = lit(b"")
assert e.to_sql() == "X''"
def test_bytes_repr(self):
e = lit(b"\x01\x02")
assert repr(e) == "Expr(X'0102')"
def test_bytes_equality_expr_sql(self):
e = col("data") == lit(b"\xca\xfe")
assert e.to_sql() == "(data = X'CAFE')"
def test_bytes_ne_expr_sql(self):
e = col("data") != lit(b"\xff")
assert e.to_sql() == "(data <> X'FF')"
def test_bytes_compound_expr_sql(self):
e = (col("data") == lit(b"\x01")) & (col("id") > lit(5))
assert e.to_sql() == "((data = X'01') AND (id > 5))"
def test_bytes_in_function_call(self):
# Regression test: binary literals inside scalar function calls
# used to fail because DataFusion's unparser does not support Binary
# scalars. Now handled via a placeholder-substitution rewrite.
e = func("contains", col("data"), lit(b"\xff"))
assert e.to_sql() == "contains(data, X'FF')"
def test_bytes_in_not(self):
e = ~(col("data") == lit(b"\xff"))
assert e.to_sql() == "NOT (data = X'FF')"
class TestExprStringMethods:
def test_lower(self):
e = col("name").lower()
@@ -430,44 +385,3 @@ class TestColNamingIntegration:
)
assert "upper_name" in result.schema.names
assert sorted(result["upper_name"].to_pylist()) == ["ALICE", "BOB", "CHARLIE"]
# ── bytes / binary column integration tests ───────────────────────────────────
@pytest.fixture
def binary_table(tmp_path):
db = lancedb.connect(str(tmp_path))
data = pa.table(
{
"id": [1, 2, 3],
"payload": pa.array(
[b"\x01\x02", b"\xca\xfe", b"\xff\x00"],
type=pa.binary(),
),
}
)
return db.create_table("binary_test", data)
class TestExprBytesIntegration:
def test_binary_equality_filter(self, binary_table):
result = (
binary_table.search().where(col("payload") == lit(b"\xca\xfe")).to_arrow()
)
assert result.num_rows == 1
assert result["id"][0].as_py() == 2
def test_binary_ne_filter(self, binary_table):
result = (
binary_table.search().where(col("payload") != lit(b"\x01\x02")).to_arrow()
)
assert result.num_rows == 2
def test_binary_compound_filter(self, binary_table):
result = (
binary_table.search()
.where((col("payload") == lit(b"\x01\x02")) | (col("id") == lit(3)))
.to_arrow()
)
assert result.num_rows == 2

View File

@@ -849,6 +849,10 @@ impl ListingDatabase {
write_params.mode = WriteMode::Overwrite;
}
if request.write_options.skip_auto_cleanup {
write_params.skip_auto_cleanup = true;
}
write_params.session = Some(self.session.clone());
write_params
@@ -2034,6 +2038,7 @@ mod tests {
}),
..Default::default()
}),
..Default::default()
};
let table = db
@@ -2107,6 +2112,7 @@ mod tests {
}),
..Default::default()
}),
..Default::default()
};
let table = db

View File

@@ -11,7 +11,6 @@ use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
use lance_namespace::{
LanceNamespace,
error::{ErrorCode, NamespaceError},
models::{
CreateNamespaceRequest, CreateNamespaceResponse, DeclareTableRequest,
DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
@@ -30,7 +29,7 @@ use crate::database::listing::{
OPT_NEW_TABLE_V2_MANIFEST_PATHS,
};
use crate::error::{Error, Result};
use crate::table::{NativeTable, map_namespace_lance_error};
use crate::table::NativeTable;
use lance::dataset::WriteMode;
use super::{
@@ -38,19 +37,6 @@ use super::{
Database, OpenTableRequest, TableNamesRequest,
};
/// Returns true if the given `lance::Error` (anywhere in its source chain) is a
/// `NamespaceError::TableAlreadyExists`.
fn is_table_already_exists_namespace_error(err: &lance::Error) -> bool {
let mut current: Option<&(dyn std::error::Error + 'static)> = Some(err);
while let Some(e) = current {
if let Some(ns_err) = e.downcast_ref::<NamespaceError>() {
return ns_err.code() == ErrorCode::TableAlreadyExists;
}
current = e.source();
}
false
}
/// A database implementation that uses lance-namespace for table management
pub struct LanceNamespaceDatabase {
namespace: Arc<dyn LanceNamespace>,
@@ -370,15 +356,13 @@ impl Database for LanceNamespaceDatabase {
(loc, opts, response.managed_versioning)
}
Err(e)
if matches!(request.mode, CreateTableMode::Create)
&& is_table_already_exists_namespace_error(&e) =>
if matches!(request.mode, CreateTableMode::Create) && {
let err_str = e.to_string();
err_str.contains("already exists")
|| err_str.contains("TableAlreadyExists")
|| err_str.contains("table already exists")
} =>
{
// A declare conflict can either mean (a) the table was previously
// *declared* but never written (in which case we should proceed and
// create it), or (b) the table is fully realized (in which case the
// user is creating something that already exists and we should
// surface TableAlreadyExists). Disambiguate by describing the table
// and checking whether it has both a version and a schema.
let response = self
.namespace
.describe_table(DescribeTableRequest {
@@ -386,8 +370,11 @@ impl Database for LanceNamespaceDatabase {
..Default::default()
})
.await
.map_err(|describe_err| {
map_namespace_lance_error(describe_err, &request.name)
.map_err(|describe_err| Error::Runtime {
message: format!(
"Failed to describe existing declared table after declare conflict: {}",
describe_err
),
})?;
if response.version.is_some() && response.schema.is_some() {
@@ -407,7 +394,9 @@ impl Database for LanceNamespaceDatabase {
(loc, opts, response.managed_versioning)
}
Err(e) => {
return Err(map_namespace_lance_error(e, &request.name));
return Err(Error::Runtime {
message: format!("Failed to declare table: {}", e),
});
}
}
}
@@ -425,6 +414,10 @@ impl Database for LanceNamespaceDatabase {
params.mode = WriteMode::Overwrite;
}
if request.write_options.skip_auto_cleanup {
params.skip_auto_cleanup = true;
}
// Set up storage options if provided
if let Some(storage_opts) = initial_storage_options {
let store_params = params
@@ -1097,120 +1090,8 @@ mod tests {
.execute()
.await;
// Verify: Should return TableNotFound — not a generic Runtime/internal error
// (regression test for ENT-1235: open_table on missing table previously surfaced as
// a generic 500/Runtime error rather than TableNotFound).
match result {
Err(Error::TableNotFound { name, .. }) => {
assert_eq!(name, "non_existent_table");
}
Err(other) => panic!("Expected TableNotFound, got: {:?}", other),
Ok(_) => panic!("Expected open_table to fail, but it succeeded"),
}
}
#[tokio::test]
async fn test_namespace_open_table_not_found_at_root() {
// Same as above, but at the root namespace (no parent namespace creation).
// Covers the common code path used by `db.open_table("foo")` without a namespace.
let tmp_dir = tempdir().unwrap();
let root_path = tmp_dir.path().to_str().unwrap().to_string();
let mut properties = HashMap::new();
properties.insert("root".to_string(), root_path);
let conn = connect_namespace("dir", properties)
.execute()
.await
.expect("Failed to connect to namespace");
let result = conn.open_table("missing_at_root").execute().await;
match result {
Err(Error::TableNotFound { name, .. }) => {
assert_eq!(name, "missing_at_root");
}
Err(other) => panic!("Expected TableNotFound, got: {:?}", other),
Ok(_) => panic!("Expected open_table to fail, but it succeeded"),
}
}
#[tokio::test]
async fn test_namespace_create_table_already_exists() {
// Regression test for ENT-1235: create_table on an existing table (in default
// Create mode) should return TableAlreadyExists, not a generic Runtime/500 error.
let tmp_dir = tempdir().unwrap();
let root_path = tmp_dir.path().to_str().unwrap().to_string();
let mut properties = HashMap::new();
properties.insert("root".to_string(), root_path);
let conn = connect_namespace("dir", properties)
.execute()
.await
.expect("Failed to connect to namespace");
conn.create_namespace(CreateNamespaceRequest {
id: Some(vec!["test_ns".into()]),
..Default::default()
})
.await
.expect("Failed to create namespace");
// Create the table once.
conn.create_table("dup_table", create_test_data())
.namespace(vec!["test_ns".into()])
.execute()
.await
.expect("Failed to create table the first time");
// Try to create it again with the default Create mode.
let result = conn
.create_table("dup_table", create_test_data())
.namespace(vec!["test_ns".into()])
.execute()
.await;
match result {
Err(Error::TableAlreadyExists { name }) => {
assert_eq!(name, "dup_table");
}
Err(other) => panic!("Expected TableAlreadyExists, got: {:?}", other),
Ok(_) => panic!("Expected create_table to fail, but it succeeded"),
}
}
#[tokio::test]
async fn test_namespace_create_table_already_exists_at_root() {
// Same as above, but at the root namespace.
let tmp_dir = tempdir().unwrap();
let root_path = tmp_dir.path().to_str().unwrap().to_string();
let mut properties = HashMap::new();
properties.insert("root".to_string(), root_path);
let conn = connect_namespace("dir", properties)
.execute()
.await
.expect("Failed to connect to namespace");
conn.create_table("dup_root", create_test_data())
.execute()
.await
.expect("Failed to create table the first time");
let result = conn
.create_table("dup_root", create_test_data())
.execute()
.await;
match result {
Err(Error::TableAlreadyExists { name }) => {
assert_eq!(name, "dup_root");
}
Err(other) => panic!("Expected TableAlreadyExists, got: {:?}", other),
Ok(_) => panic!("Expected create_table to fail, but it succeeded"),
}
// Verify: Should return an error
assert!(result.is_err());
}
#[tokio::test]

View File

@@ -138,69 +138,4 @@ mod tests {
let sql = expr_to_sql_string(&expr).unwrap();
assert!(sql.contains("price"));
}
#[test]
fn test_binary_literal() {
use datafusion_common::ScalarValue;
let expr = lit(ScalarValue::Binary(Some(vec![0xde, 0xad, 0xbe, 0xef])));
let sql = expr_to_sql_string(&expr).unwrap();
assert_eq!(sql, "X'DEADBEEF'");
}
#[test]
fn test_binary_literal_in_filter() {
use datafusion_common::ScalarValue;
let expr = col("data").eq(lit(ScalarValue::Binary(Some(vec![0xca, 0xfe]))));
let sql = expr_to_sql_string(&expr).unwrap();
assert_eq!(sql, "(data = X'CAFE')");
}
#[test]
fn test_binary_literal_compound() {
use datafusion_common::ScalarValue;
let bin_expr = col("data").eq(lit(ScalarValue::Binary(Some(vec![0x01]))));
let int_expr = col("id").gt(lit(5i64));
let combined = bin_expr.and(int_expr);
let sql = expr_to_sql_string(&combined).unwrap();
assert_eq!(sql, "((data = X'01') AND (id > 5))");
}
#[test]
fn test_null_binary_literal() {
use datafusion_common::ScalarValue;
let expr = lit(ScalarValue::Binary(None));
let sql = expr_to_sql_string(&expr).unwrap();
assert_eq!(sql, "NULL");
}
#[test]
fn test_binary_literal_in_function_call() {
use datafusion_common::ScalarValue;
// Binary literals inside scalar function arguments must also be
// serialized correctly (regression test for placeholder rewrite path).
let expr = contains(col("data"), lit(ScalarValue::Binary(Some(vec![0xff]))));
let sql = expr_to_sql_string(&expr).unwrap();
assert_eq!(sql, "contains(data, X'FF')");
}
#[test]
fn test_binary_literal_in_negation() {
use datafusion_common::ScalarValue;
use std::ops::Not;
let expr = col("data")
.eq(lit(ScalarValue::Binary(Some(vec![0xab, 0xcd]))))
.not();
let sql = expr_to_sql_string(&expr).unwrap();
assert_eq!(sql, "NOT (data = X'ABCD')");
}
#[test]
fn test_multiple_binary_literals() {
use datafusion_common::ScalarValue;
let lhs = col("a").eq(lit(ScalarValue::Binary(Some(vec![0x01]))));
let rhs = col("b").eq(lit(ScalarValue::Binary(Some(vec![0x02, 0x03]))));
let expr = lhs.and(rhs);
let sql = expr_to_sql_string(&expr).unwrap();
assert_eq!(sql, "((a = X'01') AND (b = X'0203'))");
}
}

View File

@@ -1,8 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use datafusion_common::ScalarValue;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_expr::Expr;
use datafusion_sql::unparser::{self, dialect::Dialect};
@@ -30,36 +28,7 @@ impl Dialect for LanceSqlDialect {
}
}
/// Prefix for placeholder strings inserted in place of binary literals. Chosen
/// to be extremely unlikely to occur in user data.
const BINARY_PLACEHOLDER_PREFIX: &str = "__lancedb_binary_placeholder_";
fn bytes_to_hex_sql(bytes: &[u8]) -> String {
let hex: String = bytes.iter().map(|b| format!("{b:02X}")).collect();
format!("X'{hex}'")
}
/// Returns true if *expr* contains a `Binary` or `LargeBinary` scalar literal
/// anywhere in its subtree. DataFusion's SQL unparser cannot serialize those
/// variants, so we route such expressions through a placeholder-substitution
/// path that emits SQL `X'...'` byte-string literals.
fn has_binary_literal(expr: &Expr) -> bool {
let mut found = false;
let _ = expr.apply(&mut |e: &Expr| {
if matches!(
e,
Expr::Literal(ScalarValue::Binary(_) | ScalarValue::LargeBinary(_), _)
) {
found = true;
Ok(TreeNodeRecursion::Stop)
} else {
Ok(TreeNodeRecursion::Continue)
}
});
found
}
fn run_unparser(expr: &Expr) -> crate::Result<String> {
pub fn expr_to_sql_string(expr: &Expr) -> crate::Result<String> {
let ast = unparser::Unparser::new(&LanceSqlDialect)
.expr_to_sql(expr)
.map_err(|e| crate::Error::InvalidInput {
@@ -67,49 +36,3 @@ fn run_unparser(expr: &Expr) -> crate::Result<String> {
})?;
Ok(ast.to_string())
}
pub fn expr_to_sql_string(expr: &Expr) -> crate::Result<String> {
// Fast path: no binary literals — DataFusion's unparser handles everything.
if !has_binary_literal(expr) {
return run_unparser(expr);
}
// Slow path: DataFusion's unparser cannot serialize `Binary`/`LargeBinary`
// scalars, so we rewrite each one to a unique string-literal placeholder,
// let the unparser do the rest of the work, then substitute the SQL
// `X'...'` byte-string literal back in. This keeps the operator/function
// serialization logic centralized in DataFusion and works for every
// expression node type the unparser supports.
let mut bindings: Vec<Vec<u8>> = Vec::new();
let rewritten = expr
.clone()
.transform(|e: Expr| match e {
Expr::Literal(ScalarValue::Binary(Some(bytes)), m)
| Expr::Literal(ScalarValue::LargeBinary(Some(bytes)), m) => {
let placeholder = format!("{}{}__", BINARY_PLACEHOLDER_PREFIX, bindings.len());
bindings.push(bytes);
Ok(Transformed::yes(Expr::Literal(
ScalarValue::Utf8(Some(placeholder)),
m,
)))
}
Expr::Literal(ScalarValue::Binary(None), m)
| Expr::Literal(ScalarValue::LargeBinary(None), m) => {
Ok(Transformed::yes(Expr::Literal(ScalarValue::Null, m)))
}
other => Ok(Transformed::no(other)),
})
.map_err(|e| crate::Error::InvalidInput {
message: format!("failed to rewrite expression: {}", e),
})?
.data;
let mut sql = run_unparser(&rewritten)?;
for (i, bytes) in bindings.iter().enumerate() {
// The unparser quotes string literals with single quotes, so the
// placeholder appears as `'__lancedb_binary_placeholder_<i>__'`.
let quoted = format!("'{}{}__'", BINARY_PLACEHOLDER_PREFIX, i);
sql = sql.replace(&quoted, &bytes_to_hex_sql(bytes));
}
Ok(sql)
}

View File

@@ -234,6 +234,7 @@ mod test {
.create_table("test", data)
.write_options(WriteOptions {
lance_write_params: Some(param),
..Default::default()
})
.execute()
.await;

View File

@@ -36,7 +36,6 @@ pub use query::AnyQuery;
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
use lance_namespace::LanceNamespace;
use lance_namespace::error::NamespaceError;
use lance_namespace::models::DescribeTableRequest;
use lance_table::format::Manifest;
use lance_table::io::commit::CommitHandler;
@@ -95,53 +94,6 @@ pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResu
use serde_with::skip_serializing_none;
pub use update::{UpdateBuilder, UpdateResult};
/// Walk a boxed error chain to find the innermost `NamespaceError`.
///
/// Callers like `DatasetBuilder::from_namespace` re-wrap their inner namespace error
/// inside a fresh `lance::Error::Namespace`, so a single downcast at the top level
/// won't find it. This walks `.source()` to unwrap arbitrarily nested layers.
fn find_namespace_error<'a>(
err: &'a (dyn std::error::Error + 'static),
) -> Option<&'a NamespaceError> {
let mut current: Option<&(dyn std::error::Error + 'static)> = Some(err);
while let Some(e) = current {
if let Some(ns_err) = e.downcast_ref::<NamespaceError>() {
return Some(ns_err);
}
current = e.source();
}
None
}
/// Map a `lance::Error` coming from a `lance-namespace` call into a `lancedb::Error`,
/// preserving the fine-grained namespace error code (e.g. `TableNotFound`,
/// `TableAlreadyExists`). Errors that aren't recognized namespace error variants fall
/// through to a generic runtime error rather than `TableNotFound`/`TableAlreadyExists`.
pub(crate) fn map_namespace_lance_error(err: lance::Error, table_name: &str) -> Error {
if let Some(code) = find_namespace_error(&err).map(NamespaceError::code) {
match code {
lance_namespace::error::ErrorCode::TableNotFound => {
return Error::TableNotFound {
name: table_name.to_string(),
source: Box::new(err),
};
}
lance_namespace::error::ErrorCode::TableAlreadyExists => {
return Error::TableAlreadyExists {
name: table_name.to_string(),
};
}
_ => {}
}
}
match err {
lance::Error::Namespace { source, .. } => Error::Runtime {
message: format!("Namespace error: {}", source),
},
other => other.into(),
}
}
/// Defines the type of column
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ColumnKind {
@@ -237,6 +189,18 @@ pub struct WriteOptions {
// Coming soon: https://github.com/lancedb/lancedb/issues/992
// /// What behavior to take if the data contains invalid vectors
// pub on_bad_vectors: BadVectorHandling,
/// If true, skip the automatic cleanup of old dataset versions that would
/// otherwise run during the commit. This forwards to
/// [`WriteParams::skip_auto_cleanup`] in lance-core.
///
/// Useful for high-frequency writers that want to manage version cleanup
/// themselves (e.g. via a periodic optimize job), or for writers that
/// lack delete permissions on the underlying storage.
///
/// If `lance_write_params` is also set with `skip_auto_cleanup = true`,
/// the cleanup is skipped. Setting this field to `true` forces the flag
/// on regardless of `lance_write_params`.
pub skip_auto_cleanup: bool,
/// Advanced parameters that can be used to customize table creation
///
/// Overlapping `OpenTableBuilder` options (e.g. [AddDataBuilder::mode]) will take
@@ -1542,7 +1506,12 @@ impl NativeTable {
// and storage options from the namespace
let builder = DatasetBuilder::from_namespace(namespace_client.clone(), table_id)
.await
.map_err(|e| map_namespace_lance_error(e, name))?;
.map_err(|e| match e {
lance::Error::Namespace { source, .. } => Error::Runtime {
message: format!("Failed to get table info from namespace: {:?}", source),
},
e => e.into(),
})?;
let dataset = builder
.with_read_params(params)
@@ -2326,7 +2295,8 @@ impl BaseTable for NativeTable {
let output = add.into_plan(&table_schema, &table_def)?;
let lance_params = output
let skip_auto_cleanup = output.write_options.skip_auto_cleanup;
let mut lance_params = output
.write_options
.lance_write_params
.unwrap_or(WriteParams {
@@ -2336,6 +2306,9 @@ impl BaseTable for NativeTable {
},
..Default::default()
});
if skip_auto_cleanup {
lance_params.skip_auto_cleanup = true;
}
// Repartition for write parallelism if beneficial.
let plan = if num_partitions > 1 {

View File

@@ -441,6 +441,7 @@ mod tests {
.add(new_batch.clone())
.write_options(WriteOptions {
lance_write_params: Some(param),
..Default::default()
})
.mode(AddDataMode::Append)
.execute()
@@ -761,4 +762,56 @@ mod tests {
table2.add(struct_batch).execute().await.unwrap();
assert_eq!(table2.count_rows(None).await.unwrap(), 2);
}
#[tokio::test]
async fn test_add_skip_auto_cleanup() {
// Verifies WriteOptions::skip_auto_cleanup is forwarded to lance-core's
// WriteParams and actually suppresses the cleanup hook on commit.
let tmp_dir = tempfile::tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let conn = connect(uri).execute().await.unwrap();
let batch = record_batch!(("id", Int64, [1, 2, 3])).unwrap();
let table = conn.create_table("t", batch).execute().await.unwrap();
// Cleanup on every commit, with `older_than = 0s` so prior versions are
// immediately eligible.
table
.as_native()
.unwrap()
.update_config(vec![
("lance.auto_cleanup.interval".to_string(), "1".to_string()),
(
"lance.auto_cleanup.older_than".to_string(),
"0s".to_string(),
),
])
.await
.unwrap();
// Write several versions with skip_auto_cleanup; none should be removed.
for i in 0..3 {
let new_batch = record_batch!(("id", Int64, [10 + i])).unwrap();
table
.add(new_batch)
.write_options(WriteOptions {
skip_auto_cleanup: true,
..Default::default()
})
.execute()
.await
.unwrap();
}
let versions_before = table.list_versions().await.unwrap().len();
// Now write one more without the flag; cleanup should run and prune.
let new_batch = record_batch!(("id", Int64, [42])).unwrap();
table.add(new_batch).execute().await.unwrap();
let versions_after = table.list_versions().await.unwrap().len();
assert!(
versions_after < versions_before,
"auto-cleanup should have removed old versions once the skip flag was off \
(before={versions_before}, after={versions_after})"
);
}
}

View File

@@ -219,6 +219,7 @@ impl ExecutionPlan for InsertExec {
&& let Some(merged_txn) = merge_transactions(transactions)
{
let new_dataset = CommitBuilder::new(dataset.clone())
.with_skip_auto_cleanup(write_params.skip_auto_cleanup)
.execute(merged_txn)
.await?;
ds_wrapper.update(new_dataset);

View File

@@ -528,6 +528,7 @@ mod tests {
}),
..Default::default()
}),
..Default::default()
})
.execute()
.await
@@ -589,6 +590,7 @@ mod tests {
}),
..Default::default()
}),
..Default::default()
})
.execute()
.await

View File

@@ -55,6 +55,7 @@ pub struct MergeInsertBuilder {
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
pub(crate) timeout: Option<Duration>,
pub(crate) use_index: bool,
pub(crate) skip_auto_cleanup: bool,
}
impl MergeInsertBuilder {
@@ -69,6 +70,7 @@ impl MergeInsertBuilder {
when_not_matched_by_source_delete_filt: None,
timeout: None,
use_index: true,
skip_auto_cleanup: false,
}
}
@@ -148,6 +150,17 @@ impl MergeInsertBuilder {
self
}
/// Skip the automatic cleanup of old dataset versions that would otherwise
/// run during the merge insert commit.
///
/// This forwards to [`lance::dataset::MergeInsertBuilder::skip_auto_cleanup`]
/// in lance-core. Useful for high-frequency writers that want to manage
/// version cleanup themselves, or writers without delete permissions.
pub fn skip_auto_cleanup(&mut self, skip: bool) -> &mut Self {
self.skip_auto_cleanup = skip;
self
}
/// Executes the merge insert operation
///
/// Returns version and statistics about the merge operation including the number of rows
@@ -191,6 +204,9 @@ pub(crate) async fn execute_merge_insert(
builder.when_not_matched_by_source(WhenNotMatchedBySource::Keep);
}
builder.use_index(params.use_index);
if params.skip_auto_cleanup {
builder.skip_auto_cleanup(true);
}
let future = if let Some(timeout) = params.timeout {
let future = builder