mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-20 05:20:40 +00:00
chore: remove vectordb package (#2564)
```shell git rm -r rust/ffi git rm -r node git rm ci/build_windows_artifacts.ps1 git rm ci/build_windows_artifacts_nodejs.ps1 git rm ci/build_linux_artifacts.sh git rm ci/build_macos_artifacts.sh git rm -r ci/manylinux_node git rm .github/workflows/node.yml ```
This commit is contained in:
@@ -1,42 +0,0 @@
|
||||
[package]
|
||||
name = "lancedb-node"
|
||||
version = "0.21.2"
|
||||
description = "Serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
edition.workspace = true
|
||||
repository.workspace = true
|
||||
keywords.workspace = true
|
||||
categories.workspace = true
|
||||
exclude = ["index.node"]
|
||||
rust-version = "1.75"
|
||||
|
||||
[lib]
|
||||
crate-type = ["cdylib"]
|
||||
|
||||
[dependencies]
|
||||
arrow-array = { workspace = true }
|
||||
arrow-ipc = { workspace = true }
|
||||
arrow-schema = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
conv = "0.3.3"
|
||||
once_cell = "1"
|
||||
futures = "0.3"
|
||||
half = { workspace = true }
|
||||
lance = { workspace = true }
|
||||
lance-index = { workspace = true }
|
||||
lance-linalg = { workspace = true }
|
||||
lancedb = { path = "../../lancedb" }
|
||||
tokio = { version = "1.23", features = ["rt-multi-thread"] }
|
||||
neon = { version = "0.10.1", default-features = false, features = [
|
||||
"channel-api",
|
||||
"napi-6",
|
||||
"promise-api",
|
||||
"task-api",
|
||||
] }
|
||||
object_store = { workspace = true, features = ["aws"] }
|
||||
snafu = { workspace = true }
|
||||
async-trait = "0"
|
||||
env_logger = "0"
|
||||
|
||||
# Prevent dynamic linking of lzma, which comes from datafusion
|
||||
lzma-sys = { version = "*", features = ["static"] }
|
||||
@@ -1,3 +0,0 @@
|
||||
The LanceDB node bridge (lancedb-node) allows javascript applications to access LanceDB datasets.
|
||||
|
||||
It is build using [Neon](https://neon-bindings.com). See the node project for an example of how it is used / tests
|
||||
@@ -1,37 +0,0 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use std::io::Cursor;
|
||||
use std::ops::Deref;
|
||||
|
||||
use arrow_array::RecordBatch;
|
||||
use arrow_ipc::reader::FileReader;
|
||||
use arrow_ipc::writer::FileWriter;
|
||||
use arrow_schema::SchemaRef;
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
pub fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec<RecordBatch>, SchemaRef)> {
|
||||
let mut batches: Vec<RecordBatch> = Vec::new();
|
||||
let file_reader = FileReader::try_new(Cursor::new(slice), None)?;
|
||||
let schema = file_reader.schema();
|
||||
for b in file_reader {
|
||||
let record_batch = b?;
|
||||
batches.push(record_batch);
|
||||
}
|
||||
Ok((batches, schema))
|
||||
}
|
||||
|
||||
pub fn record_batch_to_buffer(batches: Vec<RecordBatch>) -> Result<Vec<u8>> {
|
||||
if batches.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let schema = batches.first().unwrap().schema();
|
||||
let mut fr = FileWriter::try_new(Vec::new(), schema.deref())?;
|
||||
for batch in batches.iter() {
|
||||
fr.write(batch)?
|
||||
}
|
||||
fr.finish()?;
|
||||
Ok(fr.into_inner()?)
|
||||
}
|
||||
@@ -1,42 +0,0 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use neon::prelude::*;
|
||||
use neon::types::buffer::TypedArray;
|
||||
|
||||
use crate::error::ResultExt;
|
||||
|
||||
pub fn vec_str_to_array<'a, C: Context<'a>>(vec: &[String], cx: &mut C) -> JsResult<'a, JsArray> {
|
||||
let a = JsArray::new(cx, vec.len() as u32);
|
||||
for (i, s) in vec.iter().enumerate() {
|
||||
let v = cx.string(s);
|
||||
a.set(cx, i as u32, v)?;
|
||||
}
|
||||
Ok(a)
|
||||
}
|
||||
|
||||
pub fn js_array_to_vec(array: &JsArray, cx: &mut FunctionContext) -> Vec<f32> {
|
||||
let mut query_vec: Vec<f32> = Vec::new();
|
||||
for i in 0..array.len(cx) {
|
||||
let entry: Handle<JsNumber> = array.get(cx, i).unwrap();
|
||||
query_vec.push(entry.value(cx) as f32);
|
||||
}
|
||||
query_vec
|
||||
}
|
||||
|
||||
// Creates a new JsBuffer from a rust buffer with a special logic for electron
|
||||
pub fn new_js_buffer<'a>(
|
||||
buffer: Vec<u8>,
|
||||
cx: &mut TaskContext<'a>,
|
||||
is_electron: bool,
|
||||
) -> NeonResult<Handle<'a, JsBuffer>> {
|
||||
if is_electron {
|
||||
// Electron does not support `external`: https://github.com/neon-bindings/neon/pull/937
|
||||
let mut js_buffer = JsBuffer::new(cx, buffer.len()).or_throw(cx)?;
|
||||
let buffer_data = js_buffer.as_mut_slice(cx);
|
||||
buffer_data.copy_from_slice(buffer.as_slice());
|
||||
Ok(js_buffer)
|
||||
} else {
|
||||
Ok(JsBuffer::external(cx, buffer))
|
||||
}
|
||||
}
|
||||
@@ -1,86 +0,0 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use arrow_schema::ArrowError;
|
||||
use neon::context::Context;
|
||||
use neon::prelude::NeonResult;
|
||||
use snafu::Snafu;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[allow(dead_code)]
|
||||
#[snafu(display("column '{name}' is missing"))]
|
||||
MissingColumn { name: String },
|
||||
#[snafu(display("{name}: {message}"))]
|
||||
OutOfRange { name: String, message: String },
|
||||
#[allow(dead_code)]
|
||||
#[snafu(display("{index_type} is not a valid index type"))]
|
||||
InvalidIndexType { index_type: String },
|
||||
|
||||
#[snafu(display("{message}"))]
|
||||
LanceDB { message: String },
|
||||
#[snafu(display("{message}"))]
|
||||
Neon { message: String },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl From<lancedb::error::Error> for Error {
|
||||
fn from(e: lancedb::error::Error) -> Self {
|
||||
Self::LanceDB {
|
||||
message: e.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<lance::Error> for Error {
|
||||
fn from(e: lance::Error) -> Self {
|
||||
Self::LanceDB {
|
||||
message: e.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ArrowError> for Error {
|
||||
fn from(value: ArrowError) -> Self {
|
||||
Self::LanceDB {
|
||||
message: value.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<neon::result::Throw> for Error {
|
||||
fn from(value: neon::result::Throw) -> Self {
|
||||
Self::Neon {
|
||||
message: value.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<std::sync::mpsc::SendError<T>> for Error {
|
||||
fn from(value: std::sync::mpsc::SendError<T>) -> Self {
|
||||
Self::Neon {
|
||||
message: value.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// ResultExt is used to transform a [`Result`] into a [`NeonResult`],
|
||||
/// so it can be returned as a JavaScript error
|
||||
/// Copied from [Neon](https://github.com/neon-bindings/neon/blob/4c2e455a9e6814f1ba0178616d63caec7f4df317/crates/neon/src/result/mod.rs#L88)
|
||||
pub trait ResultExt<T> {
|
||||
fn or_throw<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult<T>;
|
||||
}
|
||||
|
||||
/// Implement ResultExt for the std Result so it can be used any Result type
|
||||
impl<T, E> ResultExt<T> for std::result::Result<T, E>
|
||||
where
|
||||
E: std::fmt::Display,
|
||||
{
|
||||
fn or_throw<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult<T> {
|
||||
match self {
|
||||
Ok(value) => Ok(value),
|
||||
Err(error) => cx.throw_error(error.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
pub mod scalar;
|
||||
pub mod vector;
|
||||
@@ -1,37 +0,0 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use lancedb::index::{scalar::BTreeIndexBuilder, Index};
|
||||
use neon::{
|
||||
context::{Context, FunctionContext},
|
||||
result::JsResult,
|
||||
types::{JsBoolean, JsBox, JsPromise, JsString},
|
||||
};
|
||||
|
||||
use crate::{error::ResultExt, runtime, table::JsTable};
|
||||
|
||||
pub fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
|
||||
let column = cx.argument::<JsString>(0)?.value(&mut cx);
|
||||
let replace = cx.argument::<JsBoolean>(1)?.value(&mut cx);
|
||||
|
||||
let rt = runtime(&mut cx)?;
|
||||
|
||||
let (deferred, promise) = cx.promise();
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
rt.spawn(async move {
|
||||
let idx_result = table
|
||||
.create_index(&[column], Index::BTree(BTreeIndexBuilder::default()))
|
||||
.replace(replace)
|
||||
.execute()
|
||||
.await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
idx_result.or_throw(&mut cx)?;
|
||||
Ok(cx.undefined())
|
||||
});
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
@@ -1,77 +0,0 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use lancedb::index::vector::IvfPqIndexBuilder;
|
||||
use lancedb::index::Index;
|
||||
use lancedb::DistanceType;
|
||||
use neon::context::FunctionContext;
|
||||
use neon::prelude::*;
|
||||
use std::convert::TryFrom;
|
||||
|
||||
use crate::error::ResultExt;
|
||||
use crate::neon_ext::js_object_ext::JsObjectExt;
|
||||
use crate::runtime;
|
||||
use crate::table::JsTable;
|
||||
|
||||
pub fn table_create_vector_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
|
||||
let index_params = cx.argument::<JsObject>(0)?;
|
||||
|
||||
let rt = runtime(&mut cx)?;
|
||||
|
||||
let (deferred, promise) = cx.promise();
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
let column_name = index_params
|
||||
.get_opt::<JsString, _, _>(&mut cx, "column")?
|
||||
.map(|s| s.value(&mut cx))
|
||||
.unwrap_or("vector".to_string()); // Backward compatibility
|
||||
|
||||
let replace = index_params
|
||||
.get_opt::<JsBoolean, _, _>(&mut cx, "replace")?
|
||||
.map(|r| r.value(&mut cx));
|
||||
|
||||
let tbl = table.clone();
|
||||
let ivf_pq_builder = get_index_params_builder(&mut cx, index_params).or_throw(&mut cx)?;
|
||||
|
||||
let mut index_builder = tbl.create_index(&[column_name], Index::IvfPq(ivf_pq_builder));
|
||||
if let Some(replace) = replace {
|
||||
index_builder = index_builder.replace(replace);
|
||||
}
|
||||
|
||||
rt.spawn(async move {
|
||||
let idx_result = index_builder.execute().await;
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
idx_result.or_throw(&mut cx)?;
|
||||
Ok(cx.boxed(JsTable::from(table)))
|
||||
});
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
fn get_index_params_builder(
|
||||
cx: &mut FunctionContext,
|
||||
obj: Handle<JsObject>,
|
||||
) -> crate::error::Result<IvfPqIndexBuilder> {
|
||||
if obj.get_opt::<JsString, _, _>(cx, "index_name")?.is_some() {
|
||||
return Err(crate::error::Error::LanceDB {
|
||||
message: "Setting the index_name is no longer supported".to_string(),
|
||||
});
|
||||
}
|
||||
let mut builder = IvfPqIndexBuilder::default();
|
||||
if let Some(metric_type) = obj.get_opt::<JsString, _, _>(cx, "metric_type")? {
|
||||
let distance_type = DistanceType::try_from(metric_type.value(cx).as_str())?;
|
||||
builder = builder.distance_type(distance_type);
|
||||
}
|
||||
if let Some(np) = obj.get_opt_u32(cx, "num_partitions")? {
|
||||
builder = builder.num_partitions(np);
|
||||
}
|
||||
if let Some(ns) = obj.get_opt_u32(cx, "num_sub_vectors")? {
|
||||
builder = builder.num_sub_vectors(ns);
|
||||
}
|
||||
if let Some(max_iters) = obj.get_opt_u32(cx, "max_iters")? {
|
||||
builder = builder.max_iterations(max_iters);
|
||||
}
|
||||
Ok(builder)
|
||||
}
|
||||
@@ -1,174 +0,0 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use neon::prelude::*;
|
||||
use once_cell::sync::OnceCell;
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
use lancedb::connect;
|
||||
use lancedb::connection::Connection;
|
||||
|
||||
use crate::error::ResultExt;
|
||||
use crate::query::JsQuery;
|
||||
use crate::table::JsTable;
|
||||
|
||||
mod arrow;
|
||||
mod convert;
|
||||
mod error;
|
||||
mod index;
|
||||
mod neon_ext;
|
||||
mod query;
|
||||
mod table;
|
||||
|
||||
struct JsDatabase {
|
||||
database: Connection,
|
||||
}
|
||||
|
||||
impl Finalize for JsDatabase {}
|
||||
|
||||
fn runtime<'a, C: Context<'a>>(cx: &mut C) -> NeonResult<&'static Runtime> {
|
||||
static RUNTIME: OnceCell<Runtime> = OnceCell::new();
|
||||
static LOG: OnceCell<()> = OnceCell::new();
|
||||
|
||||
LOG.get_or_init(env_logger::init);
|
||||
|
||||
RUNTIME.get_or_try_init(|| Runtime::new().or_throw(cx))
|
||||
}
|
||||
|
||||
fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let path = cx.argument::<JsString>(0)?.value(&mut cx);
|
||||
let read_consistency_interval = cx
|
||||
.argument_opt(2)
|
||||
.and_then(|arg| arg.downcast::<JsNumber, _>(&mut cx).ok())
|
||||
.map(|v| v.value(&mut cx))
|
||||
.map(std::time::Duration::from_secs_f64);
|
||||
|
||||
let storage_options_js = cx.argument::<JsArray>(1)?.to_vec(&mut cx)?;
|
||||
let mut storage_options: Vec<(String, String)> = Vec::with_capacity(storage_options_js.len());
|
||||
for handle in storage_options_js {
|
||||
let obj = handle.downcast::<JsArray, _>(&mut cx).unwrap();
|
||||
let key = obj.get::<JsString, _, _>(&mut cx, 0)?.value(&mut cx);
|
||||
let value = obj.get::<JsString, _, _>(&mut cx, 1)?.value(&mut cx);
|
||||
|
||||
storage_options.push((key, value));
|
||||
}
|
||||
|
||||
let rt = runtime(&mut cx)?;
|
||||
let channel = cx.channel();
|
||||
let (deferred, promise) = cx.promise();
|
||||
|
||||
let mut conn_builder = connect(&path).storage_options(storage_options);
|
||||
|
||||
if let Some(interval) = read_consistency_interval {
|
||||
conn_builder = conn_builder.read_consistency_interval(interval);
|
||||
}
|
||||
rt.spawn(async move {
|
||||
let database = conn_builder.execute().await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let db = JsDatabase {
|
||||
database: database.or_throw(&mut cx)?,
|
||||
};
|
||||
Ok(cx.boxed(db))
|
||||
});
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
fn database_table_names(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let db = cx
|
||||
.this()
|
||||
.downcast_or_throw::<JsBox<JsDatabase>, _>(&mut cx)?;
|
||||
|
||||
let rt = runtime(&mut cx)?;
|
||||
let (deferred, promise) = cx.promise();
|
||||
let channel = cx.channel();
|
||||
let database = db.database.clone();
|
||||
|
||||
rt.spawn(async move {
|
||||
let tables_rst = database.table_names().execute().await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let tables = tables_rst.or_throw(&mut cx)?;
|
||||
let table_names = convert::vec_str_to_array(&tables, &mut cx);
|
||||
table_names
|
||||
});
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
fn database_open_table(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let db = cx
|
||||
.this()
|
||||
.downcast_or_throw::<JsBox<JsDatabase>, _>(&mut cx)?;
|
||||
let table_name = cx.argument::<JsString>(0)?.value(&mut cx);
|
||||
|
||||
let rt = runtime(&mut cx)?;
|
||||
let channel = cx.channel();
|
||||
let database = db.database.clone();
|
||||
|
||||
let (deferred, promise) = cx.promise();
|
||||
rt.spawn(async move {
|
||||
let table_rst = database.open_table(&table_name).execute().await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let js_table = JsTable::from(table_rst.or_throw(&mut cx)?);
|
||||
Ok(cx.boxed(js_table))
|
||||
});
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
fn database_drop_table(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let db = cx
|
||||
.this()
|
||||
.downcast_or_throw::<JsBox<JsDatabase>, _>(&mut cx)?;
|
||||
let table_name = cx.argument::<JsString>(0)?.value(&mut cx);
|
||||
|
||||
let rt = runtime(&mut cx)?;
|
||||
let channel = cx.channel();
|
||||
let database = db.database.clone();
|
||||
|
||||
let (deferred, promise) = cx.promise();
|
||||
rt.spawn(async move {
|
||||
let result = database.drop_table(&table_name).await;
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
result.or_throw(&mut cx)?;
|
||||
Ok(cx.null())
|
||||
});
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
#[neon::main]
|
||||
fn main(mut cx: ModuleContext) -> NeonResult<()> {
|
||||
cx.export_function("databaseNew", database_new)?;
|
||||
cx.export_function("databaseTableNames", database_table_names)?;
|
||||
cx.export_function("databaseOpenTable", database_open_table)?;
|
||||
cx.export_function("databaseDropTable", database_drop_table)?;
|
||||
cx.export_function("tableSearch", JsQuery::js_search)?;
|
||||
cx.export_function("tableCreate", JsTable::js_create)?;
|
||||
cx.export_function("tableAdd", JsTable::js_add)?;
|
||||
cx.export_function("tableCountRows", JsTable::js_count_rows)?;
|
||||
cx.export_function("tableDelete", JsTable::js_delete)?;
|
||||
cx.export_function("tableUpdate", JsTable::js_update)?;
|
||||
cx.export_function("tableMergeInsert", JsTable::js_merge_insert)?;
|
||||
cx.export_function("tableCleanupOldVersions", JsTable::js_cleanup)?;
|
||||
cx.export_function("tableCompactFiles", JsTable::js_compact)?;
|
||||
cx.export_function("tableListIndices", JsTable::js_list_indices)?;
|
||||
cx.export_function("tableIndexStats", JsTable::js_index_stats)?;
|
||||
cx.export_function(
|
||||
"tableCreateScalarIndex",
|
||||
index::scalar::table_create_scalar_index,
|
||||
)?;
|
||||
cx.export_function(
|
||||
"tableCreateVectorIndex",
|
||||
index::vector::table_create_vector_index,
|
||||
)?;
|
||||
cx.export_function("tableSchema", JsTable::js_schema)?;
|
||||
cx.export_function("tableAddColumns", JsTable::js_add_columns)?;
|
||||
cx.export_function("tableAlterColumns", JsTable::js_alter_columns)?;
|
||||
cx.export_function("tableDropColumns", JsTable::js_drop_columns)?;
|
||||
cx.export_function("tableDropIndex", JsTable::js_drop_index)?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,4 +0,0 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
pub mod js_object_ext;
|
||||
@@ -1,72 +0,0 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use neon::prelude::*;
|
||||
|
||||
// extends neon's [JsObject] with helper functions to extract properties
|
||||
pub trait JsObjectExt {
|
||||
fn get_opt_u32(&self, cx: &mut FunctionContext, key: &str) -> Result<Option<u32>>;
|
||||
fn get_usize(&self, cx: &mut FunctionContext, key: &str) -> Result<usize>;
|
||||
#[allow(dead_code)]
|
||||
fn get_opt_usize(&self, cx: &mut FunctionContext, key: &str) -> Result<Option<usize>>;
|
||||
}
|
||||
|
||||
impl JsObjectExt for JsObject {
|
||||
fn get_opt_u32(&self, cx: &mut FunctionContext, key: &str) -> Result<Option<u32>> {
|
||||
let val_opt = self
|
||||
.get_opt::<JsNumber, _, _>(cx, key)?
|
||||
.map(|s| f64_to_u32_safe(s.value(cx), key));
|
||||
val_opt.transpose()
|
||||
}
|
||||
|
||||
fn get_usize(&self, cx: &mut FunctionContext, key: &str) -> Result<usize> {
|
||||
let val = self.get::<JsNumber, _, _>(cx, key)?.value(cx);
|
||||
f64_to_usize_safe(val, key)
|
||||
}
|
||||
|
||||
fn get_opt_usize(&self, cx: &mut FunctionContext, key: &str) -> Result<Option<usize>> {
|
||||
let val_opt = self
|
||||
.get_opt::<JsNumber, _, _>(cx, key)?
|
||||
.map(|s| f64_to_usize_safe(s.value(cx), key));
|
||||
val_opt.transpose()
|
||||
}
|
||||
}
|
||||
|
||||
fn f64_to_u32_safe(n: f64, key: &str) -> Result<u32> {
|
||||
use conv::*;
|
||||
|
||||
n.approx_as::<u32>().map_err(|e| match e {
|
||||
FloatError::NegOverflow(_) => Error::OutOfRange {
|
||||
name: key.into(),
|
||||
message: "must be > 0".to_string(),
|
||||
},
|
||||
FloatError::PosOverflow(_) => Error::OutOfRange {
|
||||
name: key.into(),
|
||||
message: format!("must be < {}", u32::MAX),
|
||||
},
|
||||
FloatError::NotANumber(_) => Error::OutOfRange {
|
||||
name: key.into(),
|
||||
message: "not a valid number".to_string(),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn f64_to_usize_safe(n: f64, key: &str) -> Result<usize> {
|
||||
use conv::*;
|
||||
|
||||
n.approx_as::<usize>().map_err(|e| match e {
|
||||
FloatError::NegOverflow(_) => Error::OutOfRange {
|
||||
name: key.into(),
|
||||
message: "must be > 0".to_string(),
|
||||
},
|
||||
FloatError::PosOverflow(_) => Error::OutOfRange {
|
||||
name: key.into(),
|
||||
message: format!("must be < {}", usize::MAX),
|
||||
},
|
||||
FloatError::NotANumber(_) => Error::OutOfRange {
|
||||
name: key.into(),
|
||||
message: "not a valid number".to_string(),
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -1,138 +0,0 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use std::convert::TryFrom;
|
||||
use std::ops::Deref;
|
||||
|
||||
use futures::{TryFutureExt, TryStreamExt};
|
||||
use lancedb::query::{ExecutableQuery, QueryBase, Select};
|
||||
use lancedb::DistanceType;
|
||||
use neon::context::FunctionContext;
|
||||
use neon::handle::Handle;
|
||||
use neon::prelude::*;
|
||||
|
||||
use crate::arrow::record_batch_to_buffer;
|
||||
use crate::error::ResultExt;
|
||||
use crate::neon_ext::js_object_ext::JsObjectExt;
|
||||
use crate::table::JsTable;
|
||||
use crate::{convert, runtime};
|
||||
|
||||
pub struct JsQuery {}
|
||||
|
||||
impl JsQuery {
|
||||
pub(crate) fn js_search(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
|
||||
let query_obj = cx.argument::<JsObject>(0)?;
|
||||
|
||||
let limit = query_obj
|
||||
.get_opt::<JsNumber, _, _>(&mut cx, "_limit")?
|
||||
.map(|value| {
|
||||
let limit = value.value(&mut cx);
|
||||
if limit <= 0.0 {
|
||||
panic!("Limit must be a positive integer");
|
||||
}
|
||||
limit as u64
|
||||
});
|
||||
let select = query_obj
|
||||
.get_opt::<JsArray, _, _>(&mut cx, "_select")?
|
||||
.map(|arr| {
|
||||
let js_array = arr.deref();
|
||||
let mut projection_vec: Vec<String> = Vec::new();
|
||||
for i in 0..js_array.len(&mut cx) {
|
||||
let entry: Handle<JsString> = js_array.get(&mut cx, i).unwrap();
|
||||
projection_vec.push(entry.value(&mut cx));
|
||||
}
|
||||
projection_vec
|
||||
});
|
||||
|
||||
let prefilter = query_obj
|
||||
.get::<JsBoolean, _, _>(&mut cx, "_prefilter")?
|
||||
.value(&mut cx);
|
||||
|
||||
let fast_search = query_obj
|
||||
.get_opt::<JsBoolean, _, _>(&mut cx, "_fastSearch")?
|
||||
.map(|val| val.value(&mut cx));
|
||||
|
||||
let is_electron = cx
|
||||
.argument::<JsBoolean>(1)
|
||||
.or_throw(&mut cx)?
|
||||
.value(&mut cx);
|
||||
|
||||
let rt = runtime(&mut cx)?;
|
||||
|
||||
let (deferred, promise) = cx.promise();
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
let mut builder = table.query();
|
||||
if let Some(filter) = query_obj
|
||||
.get_opt::<JsString, _, _>(&mut cx, "_filter")?
|
||||
.map(|s| s.value(&mut cx))
|
||||
{
|
||||
builder = builder.only_if(filter);
|
||||
}
|
||||
if let Some(select) = select {
|
||||
builder = builder.select(Select::columns(select.as_slice()));
|
||||
}
|
||||
if let Some(limit) = limit {
|
||||
builder = builder.limit(limit as usize);
|
||||
};
|
||||
if let Some(true) = fast_search {
|
||||
builder = builder.fast_search();
|
||||
}
|
||||
|
||||
let query_vector = query_obj.get_opt::<JsArray, _, _>(&mut cx, "_queryVector")?;
|
||||
if let Some(query) = query_vector.map(|q| convert::js_array_to_vec(q.deref(), &mut cx)) {
|
||||
let mut vector_builder = builder.nearest_to(query).unwrap();
|
||||
if let Some(distance_type) = query_obj
|
||||
.get_opt::<JsString, _, _>(&mut cx, "_metricType")?
|
||||
.map(|s| s.value(&mut cx))
|
||||
.map(|s| DistanceType::try_from(s.as_str()).unwrap())
|
||||
{
|
||||
vector_builder = vector_builder.distance_type(distance_type);
|
||||
}
|
||||
|
||||
let nprobes = query_obj.get_usize(&mut cx, "_nprobes").or_throw(&mut cx)?;
|
||||
vector_builder = vector_builder.nprobes(nprobes);
|
||||
|
||||
if !prefilter {
|
||||
vector_builder = vector_builder.postfilter();
|
||||
}
|
||||
rt.spawn(async move {
|
||||
let results = vector_builder
|
||||
.execute()
|
||||
.and_then(|stream| {
|
||||
stream
|
||||
.try_collect::<Vec<_>>()
|
||||
.map_err(lancedb::error::Error::from)
|
||||
})
|
||||
.await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let results = results.or_throw(&mut cx)?;
|
||||
let buffer = record_batch_to_buffer(results).or_throw(&mut cx)?;
|
||||
convert::new_js_buffer(buffer, &mut cx, is_electron)
|
||||
});
|
||||
});
|
||||
} else {
|
||||
rt.spawn(async move {
|
||||
let results = builder
|
||||
.execute()
|
||||
.and_then(|stream| {
|
||||
stream
|
||||
.try_collect::<Vec<_>>()
|
||||
.map_err(lancedb::error::Error::from)
|
||||
})
|
||||
.await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let results = results.or_throw(&mut cx)?;
|
||||
let buffer = record_batch_to_buffer(results).or_throw(&mut cx)?;
|
||||
convert::new_js_buffer(buffer, &mut cx, is_electron)
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
Ok(promise)
|
||||
}
|
||||
}
|
||||
@@ -1,645 +0,0 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use std::ops::Deref;
|
||||
|
||||
use arrow_array::{RecordBatch, RecordBatchIterator};
|
||||
use lance::dataset::optimize::CompactionOptions;
|
||||
use lance::dataset::{ColumnAlteration, NewColumnTransform, WriteMode, WriteParams};
|
||||
use lancedb::table::{OptimizeAction, WriteOptions};
|
||||
|
||||
use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer};
|
||||
use lancedb::table::Table as LanceDbTable;
|
||||
use neon::prelude::*;
|
||||
use neon::types::buffer::TypedArray;
|
||||
|
||||
use crate::error::ResultExt;
|
||||
use crate::{convert, runtime, JsDatabase};
|
||||
|
||||
pub struct JsTable {
|
||||
pub table: LanceDbTable,
|
||||
}
|
||||
|
||||
impl Finalize for JsTable {}
|
||||
|
||||
impl From<LanceDbTable> for JsTable {
|
||||
fn from(table: LanceDbTable) -> Self {
|
||||
Self { table }
|
||||
}
|
||||
}
|
||||
|
||||
impl JsTable {
|
||||
pub(crate) fn js_create(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let db = cx
|
||||
.this()
|
||||
.downcast_or_throw::<JsBox<JsDatabase>, _>(&mut cx)?;
|
||||
let table_name = cx.argument::<JsString>(0)?.value(&mut cx);
|
||||
let buffer = cx.argument::<JsBuffer>(1)?;
|
||||
let (batches, schema) =
|
||||
arrow_buffer_to_record_batch(buffer.as_slice(&cx)).or_throw(&mut cx)?;
|
||||
|
||||
// Write mode
|
||||
let mode = match cx.argument::<JsString>(2)?.value(&mut cx).as_str() {
|
||||
"overwrite" => WriteMode::Overwrite,
|
||||
"append" => WriteMode::Append,
|
||||
"create" => WriteMode::Create,
|
||||
_ => {
|
||||
return cx.throw_error("Table::create only supports 'overwrite' and 'create' modes")
|
||||
}
|
||||
};
|
||||
let params = WriteParams {
|
||||
mode,
|
||||
..WriteParams::default()
|
||||
};
|
||||
|
||||
let rt = runtime(&mut cx)?;
|
||||
let channel = cx.channel();
|
||||
|
||||
let (deferred, promise) = cx.promise();
|
||||
let database = db.database.clone();
|
||||
|
||||
rt.spawn(async move {
|
||||
let batch_reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
|
||||
let table_rst = database
|
||||
.create_table(&table_name, batch_reader)
|
||||
.write_options(WriteOptions {
|
||||
lance_write_params: Some(params),
|
||||
})
|
||||
.execute()
|
||||
.await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let table = table_rst.or_throw(&mut cx)?;
|
||||
Ok(cx.boxed(Self::from(table)))
|
||||
});
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_add(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let buffer = cx.argument::<JsBuffer>(0)?;
|
||||
let write_mode = cx.argument::<JsString>(1)?.value(&mut cx);
|
||||
let (batches, schema) =
|
||||
arrow_buffer_to_record_batch(buffer.as_slice(&cx)).or_throw(&mut cx)?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
let (deferred, promise) = cx.promise();
|
||||
let write_mode = match write_mode.as_str() {
|
||||
"create" => WriteMode::Create,
|
||||
"append" => WriteMode::Append,
|
||||
"overwrite" => WriteMode::Overwrite,
|
||||
s => return cx.throw_error(format!("invalid write mode {}", s)),
|
||||
};
|
||||
|
||||
let params = WriteParams {
|
||||
mode: write_mode,
|
||||
..WriteParams::default()
|
||||
};
|
||||
|
||||
rt.spawn(async move {
|
||||
let batch_reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
|
||||
let add_result = table
|
||||
.add(batch_reader)
|
||||
.write_options(WriteOptions {
|
||||
lance_write_params: Some(params),
|
||||
})
|
||||
.execute()
|
||||
.await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
add_result.or_throw(&mut cx)?;
|
||||
Ok(cx.boxed(Self::from(table)))
|
||||
});
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_count_rows(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let filter = cx
|
||||
.argument_opt(0)
|
||||
.and_then(|filt| {
|
||||
if filt.is_a::<JsUndefined, _>(&mut cx) || filt.is_a::<JsNull, _>(&mut cx) {
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
filt.downcast_or_throw::<JsString, _>(&mut cx)
|
||||
.map(|js_filt| js_filt.deref().value(&mut cx)),
|
||||
)
|
||||
}
|
||||
})
|
||||
.transpose()?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
let (deferred, promise) = cx.promise();
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
rt.spawn(async move {
|
||||
let num_rows_result = table.count_rows(filter).await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let num_rows = num_rows_result.or_throw(&mut cx)?;
|
||||
Ok(cx.number(num_rows as f64))
|
||||
});
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_delete(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
let (deferred, promise) = cx.promise();
|
||||
let predicate = cx.argument::<JsString>(0)?.value(&mut cx);
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
rt.spawn(async move {
|
||||
let delete_result = table.delete(&predicate).await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
delete_result.or_throw(&mut cx)?;
|
||||
Ok(cx.boxed(Self::from(table)))
|
||||
})
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_merge_insert(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
let (deferred, promise) = cx.promise();
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
let key = cx.argument::<JsString>(0)?.value(&mut cx);
|
||||
let mut builder = table.merge_insert(&[&key]);
|
||||
if cx.argument::<JsBoolean>(1)?.value(&mut cx) {
|
||||
let filter = cx.argument_opt(2).unwrap();
|
||||
if filter.is_a::<JsNull, _>(&mut cx) {
|
||||
builder.when_matched_update_all(None);
|
||||
} else {
|
||||
let filter = filter
|
||||
.downcast_or_throw::<JsString, _>(&mut cx)?
|
||||
.deref()
|
||||
.value(&mut cx);
|
||||
builder.when_matched_update_all(Some(filter));
|
||||
}
|
||||
}
|
||||
if cx.argument::<JsBoolean>(3)?.value(&mut cx) {
|
||||
builder.when_not_matched_insert_all();
|
||||
}
|
||||
if cx.argument::<JsBoolean>(4)?.value(&mut cx) {
|
||||
let filter = cx.argument_opt(5).unwrap();
|
||||
if filter.is_a::<JsNull, _>(&mut cx) {
|
||||
builder.when_not_matched_by_source_delete(None);
|
||||
} else {
|
||||
let filter = filter
|
||||
.downcast_or_throw::<JsString, _>(&mut cx)?
|
||||
.deref()
|
||||
.value(&mut cx);
|
||||
builder.when_not_matched_by_source_delete(Some(filter));
|
||||
}
|
||||
}
|
||||
|
||||
let buffer = cx.argument::<JsBuffer>(6)?;
|
||||
let (batches, schema) =
|
||||
arrow_buffer_to_record_batch(buffer.as_slice(&cx)).or_throw(&mut cx)?;
|
||||
|
||||
rt.spawn(async move {
|
||||
let new_data = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
|
||||
let merge_insert_result = builder.execute(Box::new(new_data)).await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
merge_insert_result.or_throw(&mut cx)?;
|
||||
Ok(cx.boxed(Self::from(table)))
|
||||
})
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_update(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let table = js_table.table.clone();
|
||||
|
||||
let rt = runtime(&mut cx)?;
|
||||
let (deferred, promise) = cx.promise();
|
||||
let channel = cx.channel();
|
||||
|
||||
// create a vector of updates from the passed map
|
||||
let updates_arg = cx.argument::<JsObject>(1)?;
|
||||
let properties = updates_arg.get_own_property_names(&mut cx)?;
|
||||
let mut updates: Vec<(String, String)> =
|
||||
Vec::with_capacity(properties.len(&mut cx) as usize);
|
||||
|
||||
let len_properties = properties.len(&mut cx);
|
||||
for i in 0..len_properties {
|
||||
let property = properties
|
||||
.get_value(&mut cx, i)?
|
||||
.downcast_or_throw::<JsString, _>(&mut cx)?;
|
||||
|
||||
let value = updates_arg
|
||||
.get_value(&mut cx, property)?
|
||||
.downcast_or_throw::<JsString, _>(&mut cx)?;
|
||||
|
||||
let property = property.value(&mut cx);
|
||||
let value = value.value(&mut cx);
|
||||
updates.push((property, value));
|
||||
}
|
||||
|
||||
// get the filter/predicate if the user passed one
|
||||
let predicate = cx.argument_opt(0);
|
||||
let predicate = predicate.unwrap().downcast::<JsString, _>(&mut cx);
|
||||
let predicate = match predicate {
|
||||
Ok(_) => {
|
||||
let val = predicate.map(|s| s.value(&mut cx)).unwrap();
|
||||
Some(val)
|
||||
}
|
||||
Err(_) => {
|
||||
// if the predicate is not string, check it's null otherwise an invalid
|
||||
// type was passed
|
||||
cx.argument::<JsNull>(0)?;
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
rt.spawn(async move {
|
||||
let updates_arg = updates
|
||||
.iter()
|
||||
.map(|(k, v)| (k.as_str(), v.as_str()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let predicate = predicate.as_deref();
|
||||
|
||||
let mut update_op = table.update();
|
||||
if let Some(predicate) = predicate {
|
||||
update_op = update_op.only_if(predicate);
|
||||
}
|
||||
for (column, value) in updates_arg {
|
||||
update_op = update_op.column(column, value);
|
||||
}
|
||||
let update_result = update_op.execute().await;
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
update_result.or_throw(&mut cx)?;
|
||||
Ok(cx.boxed(Self::from(table)))
|
||||
})
|
||||
});
|
||||
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_cleanup(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
let (deferred, promise) = cx.promise();
|
||||
let table = js_table.table.clone();
|
||||
let channel = cx.channel();
|
||||
|
||||
let older_than: i64 = cx
|
||||
.argument_opt(0)
|
||||
.and_then(|val| val.downcast::<JsNumber, _>(&mut cx).ok())
|
||||
.map(|val| val.value(&mut cx) as i64)
|
||||
.unwrap_or_else(|| 2 * 7 * 24 * 60); // 2 weeks
|
||||
let older_than = chrono::Duration::try_minutes(older_than).unwrap();
|
||||
let delete_unverified: Option<bool> = Some(
|
||||
cx.argument_opt(1)
|
||||
.and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok())
|
||||
.map(|val| val.value(&mut cx))
|
||||
.unwrap_or_default(),
|
||||
);
|
||||
let error_if_tagged_old_versions: Option<bool> = Some(
|
||||
cx.argument_opt(2)
|
||||
.and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok())
|
||||
.map(|val| val.value(&mut cx))
|
||||
.unwrap_or_default(),
|
||||
);
|
||||
|
||||
rt.spawn(async move {
|
||||
let stats = table
|
||||
.optimize(OptimizeAction::Prune {
|
||||
older_than: Some(older_than),
|
||||
delete_unverified,
|
||||
error_if_tagged_old_versions,
|
||||
})
|
||||
.await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let stats = stats.or_throw(&mut cx)?;
|
||||
|
||||
let prune_stats = stats.prune.as_ref().expect("Prune stats missing");
|
||||
let output_metrics = JsObject::new(&mut cx);
|
||||
let bytes_removed = cx.number(prune_stats.bytes_removed as f64);
|
||||
output_metrics.set(&mut cx, "bytesRemoved", bytes_removed)?;
|
||||
|
||||
let old_versions = cx.number(prune_stats.old_versions as f64);
|
||||
output_metrics.set(&mut cx, "oldVersions", old_versions)?;
|
||||
|
||||
let output_table = cx.boxed(Self::from(table));
|
||||
|
||||
let output = JsObject::new(&mut cx);
|
||||
output.set(&mut cx, "metrics", output_metrics)?;
|
||||
output.set(&mut cx, "newTable", output_table)?;
|
||||
|
||||
Ok(output)
|
||||
})
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_compact(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
let (deferred, promise) = cx.promise();
|
||||
let table = js_table.table.clone();
|
||||
let channel = cx.channel();
|
||||
|
||||
let js_options = cx.argument::<JsObject>(0)?;
|
||||
let mut options = CompactionOptions::default();
|
||||
|
||||
if let Some(target_rows) =
|
||||
js_options.get_opt::<JsNumber, _, _>(&mut cx, "targetRowsPerFragment")?
|
||||
{
|
||||
options.target_rows_per_fragment = target_rows.value(&mut cx) as usize;
|
||||
}
|
||||
if let Some(max_per_group) =
|
||||
js_options.get_opt::<JsNumber, _, _>(&mut cx, "maxRowsPerGroup")?
|
||||
{
|
||||
options.max_rows_per_group = max_per_group.value(&mut cx) as usize;
|
||||
}
|
||||
if let Some(materialize_deletions) =
|
||||
js_options.get_opt::<JsBoolean, _, _>(&mut cx, "materializeDeletions")?
|
||||
{
|
||||
options.materialize_deletions = materialize_deletions.value(&mut cx);
|
||||
}
|
||||
if let Some(materialize_deletions_threshold) =
|
||||
js_options.get_opt::<JsNumber, _, _>(&mut cx, "materializeDeletionsThreshold")?
|
||||
{
|
||||
options.materialize_deletions_threshold =
|
||||
materialize_deletions_threshold.value(&mut cx) as f32;
|
||||
}
|
||||
if let Some(num_threads) = js_options.get_opt::<JsNumber, _, _>(&mut cx, "numThreads")? {
|
||||
options.num_threads = Some(num_threads.value(&mut cx) as usize);
|
||||
}
|
||||
|
||||
rt.spawn(async move {
|
||||
let stats = table
|
||||
.optimize(OptimizeAction::Compact {
|
||||
options,
|
||||
remap_options: None,
|
||||
})
|
||||
.await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let stats = stats.or_throw(&mut cx)?;
|
||||
let stats = stats.compaction.as_ref().expect("Compact stats missing");
|
||||
|
||||
let output_metrics = JsObject::new(&mut cx);
|
||||
let fragments_removed = cx.number(stats.fragments_removed as f64);
|
||||
output_metrics.set(&mut cx, "fragmentsRemoved", fragments_removed)?;
|
||||
|
||||
let fragments_added = cx.number(stats.fragments_added as f64);
|
||||
output_metrics.set(&mut cx, "fragmentsAdded", fragments_added)?;
|
||||
|
||||
let files_removed = cx.number(stats.files_removed as f64);
|
||||
output_metrics.set(&mut cx, "filesRemoved", files_removed)?;
|
||||
|
||||
let files_added = cx.number(stats.files_added as f64);
|
||||
output_metrics.set(&mut cx, "filesAdded", files_added)?;
|
||||
|
||||
let output_table = cx.boxed(Self::from(table));
|
||||
|
||||
let output = JsObject::new(&mut cx);
|
||||
output.set(&mut cx, "metrics", output_metrics)?;
|
||||
output.set(&mut cx, "newTable", output_table)?;
|
||||
|
||||
Ok(output)
|
||||
})
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_list_indices(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
let (deferred, promise) = cx.promise();
|
||||
// let predicate = cx.argument::<JsString>(0)?.value(&mut cx);
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
rt.spawn(async move {
|
||||
let indices = table.as_native().unwrap().load_indices().await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let indices = indices.or_throw(&mut cx)?;
|
||||
|
||||
let output = JsArray::new(&mut cx, indices.len() as u32);
|
||||
for (i, index) in indices.iter().enumerate() {
|
||||
let js_index = JsObject::new(&mut cx);
|
||||
let index_name = cx.string(index.index_name.clone());
|
||||
js_index.set(&mut cx, "name", index_name)?;
|
||||
|
||||
let index_uuid = cx.string(index.index_uuid.clone());
|
||||
js_index.set(&mut cx, "uuid", index_uuid)?;
|
||||
|
||||
let js_index_columns = JsArray::new(&mut cx, index.columns.len() as u32);
|
||||
for (j, column) in index.columns.iter().enumerate() {
|
||||
let js_column = cx.string(column.clone());
|
||||
js_index_columns.set(&mut cx, j as u32, js_column)?;
|
||||
}
|
||||
js_index.set(&mut cx, "columns", js_index_columns)?;
|
||||
|
||||
output.set(&mut cx, i as u32, js_index)?;
|
||||
}
|
||||
|
||||
Ok(output)
|
||||
})
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_index_stats(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
let (deferred, promise) = cx.promise();
|
||||
let index_name = cx.argument::<JsString>(0)?.value(&mut cx);
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
rt.spawn(async move {
|
||||
let load_stats = table.index_stats(index_name).await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let stats = load_stats.or_throw(&mut cx)?;
|
||||
|
||||
if let Some(stats) = stats {
|
||||
let output = JsObject::new(&mut cx);
|
||||
let num_indexed_rows = cx.number(stats.num_indexed_rows as f64);
|
||||
output.set(&mut cx, "numIndexedRows", num_indexed_rows)?;
|
||||
let num_unindexed_rows = cx.number(stats.num_unindexed_rows as f64);
|
||||
output.set(&mut cx, "numUnindexedRows", num_unindexed_rows)?;
|
||||
if let Some(distance_type) = stats.distance_type {
|
||||
let distance_type = cx.string(distance_type.to_string());
|
||||
output.set(&mut cx, "distanceType", distance_type)?;
|
||||
}
|
||||
let index_type = cx.string(stats.index_type.to_string());
|
||||
output.set(&mut cx, "indexType", index_type)?;
|
||||
|
||||
if let Some(num_indices) = stats.num_indices {
|
||||
let num_indices = cx.number(num_indices as f64);
|
||||
output.set(&mut cx, "numIndices", num_indices)?;
|
||||
}
|
||||
|
||||
Ok(output.as_value(&mut cx))
|
||||
} else {
|
||||
Ok(JsNull::new(&mut cx).as_value(&mut cx))
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_schema(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
let (deferred, promise) = cx.promise();
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
let is_electron = cx
|
||||
.argument::<JsBoolean>(0)
|
||||
.or_throw(&mut cx)?
|
||||
.value(&mut cx);
|
||||
|
||||
rt.spawn(async move {
|
||||
let schema = table.schema().await;
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
let schema = schema.or_throw(&mut cx)?;
|
||||
let batches = vec![RecordBatch::new_empty(schema)];
|
||||
let buffer = record_batch_to_buffer(batches).or_throw(&mut cx)?;
|
||||
convert::new_js_buffer(buffer, &mut cx, is_electron)
|
||||
})
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_add_columns(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let expressions = cx
|
||||
.argument::<JsArray>(0)?
|
||||
.to_vec(&mut cx)?
|
||||
.into_iter()
|
||||
.map(|val| {
|
||||
let obj = val.downcast_or_throw::<JsObject, _>(&mut cx)?;
|
||||
let name = obj.get::<JsString, _, _>(&mut cx, "name")?.value(&mut cx);
|
||||
let sql = obj
|
||||
.get::<JsString, _, _>(&mut cx, "valueSql")?
|
||||
.value(&mut cx);
|
||||
Ok((name, sql))
|
||||
})
|
||||
.collect::<NeonResult<Vec<(String, String)>>>()?;
|
||||
|
||||
let transforms = NewColumnTransform::SqlExpressions(expressions);
|
||||
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
|
||||
let (deferred, promise) = cx.promise();
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
rt.spawn(async move {
|
||||
let result = table.add_columns(transforms, None).await;
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
result.or_throw(&mut cx)?;
|
||||
Ok(cx.undefined())
|
||||
})
|
||||
});
|
||||
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_alter_columns(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let alterations = cx
|
||||
.argument::<JsArray>(0)?
|
||||
.to_vec(&mut cx)?
|
||||
.into_iter()
|
||||
.map(|val| {
|
||||
let obj = val.downcast_or_throw::<JsObject, _>(&mut cx)?;
|
||||
let path = obj.get::<JsString, _, _>(&mut cx, "path")?.value(&mut cx);
|
||||
let rename = obj
|
||||
.get_opt::<JsString, _, _>(&mut cx, "rename")?
|
||||
.map(|val| val.value(&mut cx));
|
||||
let nullable = obj
|
||||
.get_opt::<JsBoolean, _, _>(&mut cx, "nullable")?
|
||||
.map(|val| val.value(&mut cx));
|
||||
// TODO: support data type here. Will need to do some serialization/deserialization
|
||||
|
||||
if rename.is_none() && nullable.is_none() {
|
||||
return cx.throw_error("At least one of 'name' or 'nullable' must be provided");
|
||||
}
|
||||
|
||||
Ok(ColumnAlteration {
|
||||
path,
|
||||
rename,
|
||||
nullable,
|
||||
// TODO: wire up this field
|
||||
data_type: None,
|
||||
})
|
||||
})
|
||||
.collect::<NeonResult<Vec<ColumnAlteration>>>()?;
|
||||
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
|
||||
let (deferred, promise) = cx.promise();
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
rt.spawn(async move {
|
||||
let result = table.alter_columns(&alterations).await;
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
result.or_throw(&mut cx)?;
|
||||
Ok(cx.undefined())
|
||||
})
|
||||
});
|
||||
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_drop_columns(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let columns = cx
|
||||
.argument::<JsArray>(0)?
|
||||
.to_vec(&mut cx)?
|
||||
.into_iter()
|
||||
.map(|val| {
|
||||
Ok(val
|
||||
.downcast_or_throw::<JsString, _>(&mut cx)?
|
||||
.value(&mut cx))
|
||||
})
|
||||
.collect::<NeonResult<Vec<String>>>()?;
|
||||
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
|
||||
let (deferred, promise) = cx.promise();
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
rt.spawn(async move {
|
||||
let col_refs = columns.iter().map(|s| s.as_str()).collect::<Vec<_>>();
|
||||
let result = table.drop_columns(&col_refs).await;
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
result.or_throw(&mut cx)?;
|
||||
Ok(cx.undefined())
|
||||
})
|
||||
});
|
||||
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_drop_index(_cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
todo!("not implemented")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user