diff --git a/Cargo.toml b/Cargo.toml index 7f336d10..b97e5502 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,11 +20,11 @@ keywords = ["lancedb", "lance", "database", "vector", "search"] categories = ["database-implementations"] [workspace.dependencies] -lance = { "version" = "=0.11.1", "features" = ["dynamodb"] } -lance-index = { "version" = "=0.11.1" } -lance-linalg = { "version" = "=0.11.1" } -lance-testing = { "version" = "=0.11.1" } -lance-datafusion = { "version" = "=0.11.1" } +lance = { "version" = "=0.12.0", "features" = ["dynamodb"] } +lance-index = { "version" = "=0.12.0" } +lance-linalg = { "version" = "=0.12.0" } +lance-testing = { "version" = "=0.12.0" } +lance-datafusion = { "version" = "=0.12.0" } # Note that this one does not include pyarrow arrow = { version = "51.0", optional = false } arrow-array = "51.0" diff --git a/nodejs/__test__/connection.test.ts b/nodejs/__test__/connection.test.ts index dc38d0c2..5e05f103 100644 --- a/nodejs/__test__/connection.test.ts +++ b/nodejs/__test__/connection.test.ts @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +import { Field, Float64, Schema } from "apache-arrow"; import * as tmp from "tmp"; -import { Connection, connect } from "../lancedb"; +import { Connection, Table, connect } from "../lancedb"; describe("when connecting", () => { let tmpDir: tmp.DirResult; @@ -86,4 +87,39 @@ describe("given a connection", () => { tables = await db.tableNames({ startAfter: "a" }); expect(tables).toEqual(["b", "c"]); }); + + it("should create tables in v2 mode", async () => { + const db = await connect(tmpDir.name); + const data = [...Array(10000).keys()].map((i) => ({ id: i })); + + // Create in v1 mode + let table = await db.createTable("test", data); + + const isV2 = async (table: Table) => { + const data = await table.query().toArrow({ maxBatchLength: 100000 }); + console.log(data.batches.length); + return data.batches.length < 5; + }; + + await expect(isV2(table)).resolves.toBe(false); + + // Create in v2 mode + table = await db.createTable("test_v2", data, { useLegacyFormat: false }); + + await expect(isV2(table)).resolves.toBe(true); + + await table.add(data); + + await expect(isV2(table)).resolves.toBe(true); + + // Create empty in v2 mode + const schema = new Schema([new Field("id", new Float64(), true)]); + + table = await db.createEmptyTable("test_v2_empty", schema, { + useLegacyFormat: false, + }); + + await table.add(data); + await expect(isV2(table)).resolves.toBe(true); + }); }); diff --git a/nodejs/lancedb/connection.ts b/nodejs/lancedb/connection.ts index 8948d869..e8f795db 100644 --- a/nodejs/lancedb/connection.ts +++ b/nodejs/lancedb/connection.ts @@ -71,6 +71,12 @@ export interface CreateTableOptions { * The available options are described at https://lancedb.github.io/lancedb/guides/storage/ */ storageOptions?: Record; + /** + * If true then data files will be written with the legacy format + * + * The default is true while the new format is in beta + */ + useLegacyFormat?: boolean; schema?: Schema; embeddingFunction?: EmbeddingFunctionConfig; } @@ -221,6 +227,7 @@ export class Connection { buf, mode, cleanseStorageOptions(options?.storageOptions), + options?.useLegacyFormat, ); return new Table(innerTable); @@ -256,6 +263,7 @@ export class Connection { buf, mode, cleanseStorageOptions(options?.storageOptions), + options?.useLegacyFormat, ); return new Table(innerTable); } diff --git a/nodejs/lancedb/query.ts b/nodejs/lancedb/query.ts index dea06155..13604e7c 100644 --- a/nodejs/lancedb/query.ts +++ b/nodejs/lancedb/query.ts @@ -55,6 +55,39 @@ export class RecordBatchIterator implements AsyncIterator { } /* eslint-enable */ +class RecordBatchIterable< + NativeQueryType extends NativeQuery | NativeVectorQuery, +> implements AsyncIterable +{ + private inner: NativeQueryType; + private options?: QueryExecutionOptions; + + constructor(inner: NativeQueryType, options?: QueryExecutionOptions) { + this.inner = inner; + this.options = options; + } + + // biome-ignore lint/suspicious/noExplicitAny: skip + [Symbol.asyncIterator](): AsyncIterator, any, undefined> { + return new RecordBatchIterator( + this.inner.execute(this.options?.maxBatchLength), + ); + } +} + +/** + * Options that control the behavior of a particular query execution + */ +export interface QueryExecutionOptions { + /** + * The maximum number of rows to return in a single batch + * + * Batches may have fewer rows if the underlying data is stored + * in smaller chunks. + */ + maxBatchLength?: number; +} + /** Common methods supported by all query types */ export class QueryBase< NativeQueryType extends NativeQuery | NativeVectorQuery, @@ -141,8 +174,10 @@ export class QueryBase< return this as unknown as QueryType; } - protected nativeExecute(): Promise { - return this.inner.execute(); + protected nativeExecute( + options?: Partial, + ): Promise { + return this.inner.execute(options?.maxBatchLength); } /** @@ -156,8 +191,10 @@ export class QueryBase< * single query) * */ - protected execute(): RecordBatchIterator { - return new RecordBatchIterator(this.nativeExecute()); + protected execute( + options?: Partial, + ): RecordBatchIterator { + return new RecordBatchIterator(this.nativeExecute(options)); } // biome-ignore lint/suspicious/noExplicitAny: skip @@ -167,9 +204,9 @@ export class QueryBase< } /** Collect the results as an Arrow @see {@link ArrowTable}. */ - async toArrow(): Promise { + async toArrow(options?: Partial): Promise { const batches = []; - for await (const batch of this) { + for await (const batch of new RecordBatchIterable(this.inner, options)) { batches.push(batch); } return new ArrowTable(batches); @@ -177,9 +214,8 @@ export class QueryBase< /** Collect the results as an array of objects. */ // biome-ignore lint/suspicious/noExplicitAny: arrow.toArrow() returns any[] - async toArray(): Promise { - const tbl = await this.toArrow(); - + async toArray(options?: Partial): Promise { + const tbl = await this.toArrow(options); return tbl.toArray(); } } diff --git a/nodejs/src/connection.rs b/nodejs/src/connection.rs index 5b45a580..88300184 100644 --- a/nodejs/src/connection.rs +++ b/nodejs/src/connection.rs @@ -126,6 +126,7 @@ impl Connection { buf: Buffer, mode: String, storage_options: Option>, + use_legacy_format: Option, ) -> napi::Result { let batches = ipc_file_to_batches(buf.to_vec()) .map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?; @@ -136,6 +137,9 @@ impl Connection { builder = builder.storage_option(key, value); } } + if let Some(use_legacy_format) = use_legacy_format { + builder = builder.use_legacy_format(use_legacy_format); + } let tbl = builder .execute() .await @@ -150,6 +154,7 @@ impl Connection { schema_buf: Buffer, mode: String, storage_options: Option>, + use_legacy_format: Option, ) -> napi::Result
{ let schema = ipc_file_to_schema(schema_buf.to_vec()).map_err(|e| { napi::Error::from_reason(format!("Failed to marshal schema from JS to Rust: {}", e)) @@ -164,6 +169,9 @@ impl Connection { builder = builder.storage_option(key, value); } } + if let Some(use_legacy_format) = use_legacy_format { + builder = builder.use_legacy_format(use_legacy_format); + } let tbl = builder .execute() .await diff --git a/nodejs/src/lib.rs b/nodejs/src/lib.rs index a3289b24..423893f8 100644 --- a/nodejs/src/lib.rs +++ b/nodejs/src/lib.rs @@ -56,6 +56,7 @@ pub enum WriteMode { /// Write options when creating a Table. #[napi(object)] pub struct WriteOptions { + /// Write mode for writing to a table. pub mode: Option, } diff --git a/nodejs/src/query.rs b/nodejs/src/query.rs index f74faa86..a2046ce6 100644 --- a/nodejs/src/query.rs +++ b/nodejs/src/query.rs @@ -15,6 +15,7 @@ use lancedb::query::ExecutableQuery; use lancedb::query::Query as LanceDbQuery; use lancedb::query::QueryBase; +use lancedb::query::QueryExecutionOptions; use lancedb::query::Select; use lancedb::query::VectorQuery as LanceDbVectorQuery; use napi::bindgen_prelude::*; @@ -62,10 +63,21 @@ impl Query { } #[napi] - pub async fn execute(&self) -> napi::Result { - let inner_stream = self.inner.execute().await.map_err(|e| { - napi::Error::from_reason(format!("Failed to execute query stream: {}", e)) - })?; + pub async fn execute( + &self, + max_batch_length: Option, + ) -> napi::Result { + let mut execution_opts = QueryExecutionOptions::default(); + if let Some(max_batch_length) = max_batch_length { + execution_opts.max_batch_length = max_batch_length; + } + let inner_stream = self + .inner + .execute_with_options(execution_opts) + .await + .map_err(|e| { + napi::Error::from_reason(format!("Failed to execute query stream: {}", e)) + })?; Ok(RecordBatchIterator::new(inner_stream)) } } @@ -125,10 +137,21 @@ impl VectorQuery { } #[napi] - pub async fn execute(&self) -> napi::Result { - let inner_stream = self.inner.execute().await.map_err(|e| { - napi::Error::from_reason(format!("Failed to execute query stream: {}", e)) - })?; + pub async fn execute( + &self, + max_batch_length: Option, + ) -> napi::Result { + let mut execution_opts = QueryExecutionOptions::default(); + if let Some(max_batch_length) = max_batch_length { + execution_opts.max_batch_length = max_batch_length; + } + let inner_stream = self + .inner + .execute_with_options(execution_opts) + .await + .map_err(|e| { + napi::Error::from_reason(format!("Failed to execute query stream: {}", e)) + })?; Ok(RecordBatchIterator::new(inner_stream)) } } diff --git a/python/pyproject.toml b/python/pyproject.toml index 81a53149..829eb2fa 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -3,7 +3,7 @@ name = "lancedb" # version in Cargo.toml dependencies = [ "deprecation", - "pylance==0.11.1", + "pylance==0.12.0", "ratelimiter~=1.0", "requests>=2.31.0", "retry>=0.9.2", diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index 0c6bd893..f930087d 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -24,6 +24,7 @@ class Connection(object): mode: str, data: pa.RecordBatchReader, storage_options: Optional[Dict[str, str]] = None, + use_legacy_format: Optional[bool] = None, ) -> Table: ... async def create_empty_table( self, @@ -31,6 +32,7 @@ class Connection(object): mode: str, schema: pa.Schema, storage_options: Optional[Dict[str, str]] = None, + use_legacy_format: Optional[bool] = None, ) -> Table: ... class Table: @@ -72,7 +74,7 @@ class Query: def select(self, columns: Tuple[str, str]): ... def limit(self, limit: int): ... def nearest_to(self, query_vec: pa.Array) -> VectorQuery: ... - async def execute(self) -> RecordBatchStream: ... + async def execute(self, max_batch_legnth: Optional[int]) -> RecordBatchStream: ... class VectorQuery: async def execute(self) -> RecordBatchStream: ... diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index 4bf66332..dc7ccd8d 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -558,6 +558,8 @@ class AsyncConnection(object): on_bad_vectors: Optional[str] = None, fill_value: Optional[float] = None, storage_options: Optional[Dict[str, str]] = None, + *, + use_legacy_format: Optional[bool] = None, ) -> AsyncTable: """Create an [AsyncTable][lancedb.table.AsyncTable] in the database. @@ -600,6 +602,9 @@ class AsyncConnection(object): connection will be inherited by the table, but can be overridden here. See available options at https://lancedb.github.io/lancedb/guides/storage/ + use_legacy_format: bool, optional, default True + If True, use the legacy format for the table. If False, use the new format. + The default is True while the new format is in beta. Returns @@ -761,7 +766,11 @@ class AsyncConnection(object): if data is None: new_table = await self._inner.create_empty_table( - name, mode, schema, storage_options=storage_options + name, + mode, + schema, + storage_options=storage_options, + use_legacy_format=use_legacy_format, ) else: data = data_to_reader(data, schema) @@ -770,6 +779,7 @@ class AsyncConnection(object): mode, data, storage_options=storage_options, + use_legacy_format=use_legacy_format, ) return AsyncTable(new_table) diff --git a/python/python/lancedb/query.py b/python/python/lancedb/query.py index a6f3efc5..35201de9 100644 --- a/python/python/lancedb/query.py +++ b/python/python/lancedb/query.py @@ -1113,11 +1113,22 @@ class AsyncQueryBase(object): self._inner.limit(limit) return self - async def to_batches(self) -> AsyncRecordBatchReader: + async def to_batches( + self, *, max_batch_length: Optional[int] = None + ) -> AsyncRecordBatchReader: """ Execute the query and return the results as an Apache Arrow RecordBatchReader. + + Parameters + ---------- + + max_batch_length: Optional[int] + The maximum number of selected records in a single RecordBatch object. + If not specified, a default batch length is used. + It is possible for batches to be smaller than the provided length if the + underlying data is stored in smaller chunks. """ - return AsyncRecordBatchReader(await self._inner.execute()) + return AsyncRecordBatchReader(await self._inner.execute(max_batch_length)) async def to_arrow(self) -> pa.Table: """ diff --git a/python/python/tests/test_db.py b/python/python/tests/test_db.py index 027a14ef..591b3c72 100644 --- a/python/python/tests/test_db.py +++ b/python/python/tests/test_db.py @@ -507,6 +507,52 @@ def test_empty_or_nonexistent_table(tmp_path): assert test.schema == test2.schema +@pytest.mark.asyncio +async def test_create_in_v2_mode(tmp_path): + def make_data(): + for i in range(10): + yield pa.record_batch([pa.array([x for x in range(1024)])], names=["x"]) + + def make_table(): + return pa.table([pa.array([x for x in range(10 * 1024)])], names=["x"]) + + schema = pa.schema([pa.field("x", pa.int64())]) + + db = await lancedb.connect_async(tmp_path) + + # Create table in v1 mode + tbl = await db.create_table("test", data=make_data(), schema=schema) + + async def is_in_v2_mode(tbl): + batches = await tbl.query().to_batches(max_batch_length=1024 * 10) + num_batches = 0 + async for batch in batches: + num_batches += 1 + return num_batches < 10 + + assert not await is_in_v2_mode(tbl) + + # Create table in v2 mode + tbl = await db.create_table( + "test_v2", data=make_data(), schema=schema, use_legacy_format=False + ) + + assert await is_in_v2_mode(tbl) + + # Add data (should remain in v2 mode) + await tbl.add(make_table()) + + assert await is_in_v2_mode(tbl) + + # Create empty table in v2 mode and add data + tbl = await db.create_table( + "test_empty_v2", data=None, schema=schema, use_legacy_format=False + ) + await tbl.add(make_table()) + + assert await is_in_v2_mode(tbl) + + def test_replace_index(tmp_path): db = lancedb.connect(uri=tmp_path) table = db.create_table( diff --git a/python/src/connection.rs b/python/src/connection.rs index bf55facd..39afc9f6 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -91,6 +91,7 @@ impl Connection { mode: &str, data: &PyAny, storage_options: Option>, + use_legacy_format: Option, ) -> PyResult<&'a PyAny> { let inner = self_.get_inner()?.clone(); @@ -103,6 +104,10 @@ impl Connection { builder = builder.storage_options(storage_options); } + if let Some(use_legacy_format) = use_legacy_format { + builder = builder.use_legacy_format(use_legacy_format); + } + future_into_py(self_.py(), async move { let table = builder.execute().await.infer_error()?; Ok(Table::new(table)) @@ -115,6 +120,7 @@ impl Connection { mode: &str, schema: &PyAny, storage_options: Option>, + use_legacy_format: Option, ) -> PyResult<&'a PyAny> { let inner = self_.get_inner()?.clone(); @@ -128,6 +134,10 @@ impl Connection { builder = builder.storage_options(storage_options); } + if let Some(use_legacy_format) = use_legacy_format { + builder = builder.use_legacy_format(use_legacy_format); + } + future_into_py(self_.py(), async move { let table = builder.execute().await.infer_error()?; Ok(Table::new(table)) diff --git a/python/src/query.rs b/python/src/query.rs index cdcac654..fadf2aeb 100644 --- a/python/src/query.rs +++ b/python/src/query.rs @@ -15,6 +15,7 @@ use arrow::array::make_array; use arrow::array::ArrayData; use arrow::pyarrow::FromPyArrow; +use lancedb::query::QueryExecutionOptions; use lancedb::query::{ ExecutableQuery, Query as LanceDbQuery, QueryBase, Select, VectorQuery as LanceDbVectorQuery, }; @@ -61,10 +62,14 @@ impl Query { Ok(VectorQuery { inner }) } - pub fn execute(self_: PyRef<'_, Self>) -> PyResult<&PyAny> { + pub fn execute(self_: PyRef<'_, Self>, max_batch_length: Option) -> PyResult<&PyAny> { let inner = self_.inner.clone(); future_into_py(self_.py(), async move { - let inner_stream = inner.execute().await.infer_error()?; + let mut opts = QueryExecutionOptions::default(); + if let Some(max_batch_length) = max_batch_length { + opts.max_batch_length = max_batch_length; + } + let inner_stream = inner.execute_with_options(opts).await.infer_error()?; Ok(RecordBatchStream::new(inner_stream)) }) } @@ -115,10 +120,14 @@ impl VectorQuery { self.inner = self.inner.clone().bypass_vector_index() } - pub fn execute(self_: PyRef<'_, Self>) -> PyResult<&PyAny> { + pub fn execute(self_: PyRef<'_, Self>, max_batch_length: Option) -> PyResult<&PyAny> { let inner = self_.inner.clone(); future_into_py(self_.py(), async move { - let inner_stream = inner.execute().await.infer_error()?; + let mut opts = QueryExecutionOptions::default(); + if let Some(max_batch_length) = max_batch_length { + opts.max_batch_length = max_batch_length; + } + let inner_stream = inner.execute_with_options(opts).await.infer_error()?; Ok(RecordBatchStream::new(inner_stream)) }) } diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 60a275f3..eaa79062 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -140,6 +140,7 @@ pub struct CreateTableBuilder { pub(crate) write_options: WriteOptions, pub(crate) table_definition: Option, pub(crate) embeddings: Vec<(EmbeddingDefinition, Arc)>, + pub(crate) use_legacy_format: bool, } // Builder methods that only apply when we have initial data @@ -153,6 +154,7 @@ impl CreateTableBuilder { write_options: WriteOptions::default(), table_definition: None, embeddings: Vec::new(), + use_legacy_format: true, } } @@ -184,6 +186,7 @@ impl CreateTableBuilder { mode: self.mode, write_options: self.write_options, embeddings: self.embeddings, + use_legacy_format: self.use_legacy_format, }; Ok((data, builder)) } @@ -217,6 +220,7 @@ impl CreateTableBuilder { mode: CreateTableMode::default(), write_options: WriteOptions::default(), embeddings: Vec::new(), + use_legacy_format: false, } } @@ -278,6 +282,20 @@ impl CreateTableBuilder { } self } + + /// Set to true to use the v1 format for data files + /// + /// This is currently defaulted to true and can be set to false to opt-in + /// to the new format. This should only be used for experimentation and + /// evaluation. The new format is still in beta and may change in ways that + /// are not backwards compatible. + /// + /// Once the new format is stable, the default will change to `false` for + /// several releases and then eventually this option will be removed. + pub fn use_legacy_format(mut self, use_legacy_format: bool) -> Self { + self.use_legacy_format = use_legacy_format; + self + } } #[derive(Clone, Debug)] @@ -943,6 +961,7 @@ impl ConnectionInternal for Database { if matches!(&options.mode, CreateTableMode::Overwrite) { write_params.mode = WriteMode::Overwrite; } + write_params.use_legacy_format = options.use_legacy_format; match NativeTable::create( &table_uri, @@ -1040,8 +1059,12 @@ impl ConnectionInternal for Database { #[cfg(test)] mod tests { use arrow_schema::{DataType, Field, Schema}; + use futures::TryStreamExt; + use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; use tempfile::tempdir; + use crate::query::{ExecutableQuery, QueryExecutionOptions}; + use super::*; #[tokio::test] @@ -1146,6 +1169,58 @@ mod tests { assert_eq!(tables, vec!["table1".to_owned()]); } + fn make_data() -> impl RecordBatchReader + Send + 'static { + let id = Box::new(IncrementingInt32::new().named("id".to_string())); + BatchGenerator::new().col(id).batches(10, 2000) + } + + #[tokio::test] + async fn test_create_table_v2() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + let db = connect(uri).execute().await.unwrap(); + + let tbl = db + .create_table("v1_test", make_data()) + .execute() + .await + .unwrap(); + + // In v1 the row group size will trump max_batch_length + let batches = tbl + .query() + .execute_with_options(QueryExecutionOptions { + max_batch_length: 50000, + }) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + assert_eq!(batches.len(), 20); + + let tbl = db + .create_table("v2_test", make_data()) + .use_legacy_format(false) + .execute() + .await + .unwrap(); + + // In v2 the page size is much bigger than 50k so we should get a single batch + let batches = tbl + .query() + .execute_with_options(QueryExecutionOptions { + max_batch_length: 50000, + }) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(batches.len(), 1); + } + #[tokio::test] async fn drop_table() { let tmp_dir = tempdir().unwrap(); diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 3bd0fc49..afc486e9 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -233,7 +233,8 @@ pub struct WriteOptions { // pub on_bad_vectors: BadVectorHandling, /// Advanced parameters that can be used to customize table creation /// - /// If set, these will take precedence over any overlapping `OpenTableBuilder` options + /// Overlapping `OpenTableBuilder` options (e.g. [AddDataBuilder::mode]) will take + /// precedence over their counterparts in `WriteOptions` (e.g. [WriteParams::mode]). pub lance_write_params: Option, } diff --git a/rust/lancedb/tests/embedding_registry_test.rs b/rust/lancedb/tests/embedding_registry_test.rs index f2e8c9de..3fd54bfb 100644 --- a/rust/lancedb/tests/embedding_registry_test.rs +++ b/rust/lancedb/tests/embedding_registry_test.rs @@ -318,6 +318,7 @@ impl EmbeddingFunction for MockEmbed { Ok(Arc::new(arr)) } + #[allow(unused_variables)] fn compute_query_embeddings(&self, input: Arc) -> Result> { unimplemented!() }