feat: make it possible to opt in to using the v2 format (#1352)

This also exposed the max_batch_length configuration option in
python/node (it was needed to verify if we are actually in v2 mode or
not)
This commit is contained in:
Weston Pace
2024-06-04 21:52:14 -07:00
committed by GitHub
parent d39e7d23f4
commit d5586c9c32
17 changed files with 310 additions and 33 deletions

View File

@@ -20,11 +20,11 @@ keywords = ["lancedb", "lance", "database", "vector", "search"]
categories = ["database-implementations"]
[workspace.dependencies]
lance = { "version" = "=0.11.1", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.11.1" }
lance-linalg = { "version" = "=0.11.1" }
lance-testing = { "version" = "=0.11.1" }
lance-datafusion = { "version" = "=0.11.1" }
lance = { "version" = "=0.12.0", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.12.0" }
lance-linalg = { "version" = "=0.12.0" }
lance-testing = { "version" = "=0.12.0" }
lance-datafusion = { "version" = "=0.12.0" }
# Note that this one does not include pyarrow
arrow = { version = "51.0", optional = false }
arrow-array = "51.0"

View File

@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import { Field, Float64, Schema } from "apache-arrow";
import * as tmp from "tmp";
import { Connection, connect } from "../lancedb";
import { Connection, Table, connect } from "../lancedb";
describe("when connecting", () => {
let tmpDir: tmp.DirResult;
@@ -86,4 +87,39 @@ describe("given a connection", () => {
tables = await db.tableNames({ startAfter: "a" });
expect(tables).toEqual(["b", "c"]);
});
it("should create tables in v2 mode", async () => {
const db = await connect(tmpDir.name);
const data = [...Array(10000).keys()].map((i) => ({ id: i }));
// Create in v1 mode
let table = await db.createTable("test", data);
const isV2 = async (table: Table) => {
const data = await table.query().toArrow({ maxBatchLength: 100000 });
console.log(data.batches.length);
return data.batches.length < 5;
};
await expect(isV2(table)).resolves.toBe(false);
// Create in v2 mode
table = await db.createTable("test_v2", data, { useLegacyFormat: false });
await expect(isV2(table)).resolves.toBe(true);
await table.add(data);
await expect(isV2(table)).resolves.toBe(true);
// Create empty in v2 mode
const schema = new Schema([new Field("id", new Float64(), true)]);
table = await db.createEmptyTable("test_v2_empty", schema, {
useLegacyFormat: false,
});
await table.add(data);
await expect(isV2(table)).resolves.toBe(true);
});
});

View File

@@ -71,6 +71,12 @@ export interface CreateTableOptions {
* The available options are described at https://lancedb.github.io/lancedb/guides/storage/
*/
storageOptions?: Record<string, string>;
/**
* If true then data files will be written with the legacy format
*
* The default is true while the new format is in beta
*/
useLegacyFormat?: boolean;
schema?: Schema;
embeddingFunction?: EmbeddingFunctionConfig;
}
@@ -221,6 +227,7 @@ export class Connection {
buf,
mode,
cleanseStorageOptions(options?.storageOptions),
options?.useLegacyFormat,
);
return new Table(innerTable);
@@ -256,6 +263,7 @@ export class Connection {
buf,
mode,
cleanseStorageOptions(options?.storageOptions),
options?.useLegacyFormat,
);
return new Table(innerTable);
}

View File

@@ -55,6 +55,39 @@ export class RecordBatchIterator implements AsyncIterator<RecordBatch> {
}
/* eslint-enable */
class RecordBatchIterable<
NativeQueryType extends NativeQuery | NativeVectorQuery,
> implements AsyncIterable<RecordBatch>
{
private inner: NativeQueryType;
private options?: QueryExecutionOptions;
constructor(inner: NativeQueryType, options?: QueryExecutionOptions) {
this.inner = inner;
this.options = options;
}
// biome-ignore lint/suspicious/noExplicitAny: skip
[Symbol.asyncIterator](): AsyncIterator<RecordBatch<any>, any, undefined> {
return new RecordBatchIterator(
this.inner.execute(this.options?.maxBatchLength),
);
}
}
/**
* Options that control the behavior of a particular query execution
*/
export interface QueryExecutionOptions {
/**
* The maximum number of rows to return in a single batch
*
* Batches may have fewer rows if the underlying data is stored
* in smaller chunks.
*/
maxBatchLength?: number;
}
/** Common methods supported by all query types */
export class QueryBase<
NativeQueryType extends NativeQuery | NativeVectorQuery,
@@ -141,8 +174,10 @@ export class QueryBase<
return this as unknown as QueryType;
}
protected nativeExecute(): Promise<NativeBatchIterator> {
return this.inner.execute();
protected nativeExecute(
options?: Partial<QueryExecutionOptions>,
): Promise<NativeBatchIterator> {
return this.inner.execute(options?.maxBatchLength);
}
/**
@@ -156,8 +191,10 @@ export class QueryBase<
* single query)
*
*/
protected execute(): RecordBatchIterator {
return new RecordBatchIterator(this.nativeExecute());
protected execute(
options?: Partial<QueryExecutionOptions>,
): RecordBatchIterator {
return new RecordBatchIterator(this.nativeExecute(options));
}
// biome-ignore lint/suspicious/noExplicitAny: skip
@@ -167,9 +204,9 @@ export class QueryBase<
}
/** Collect the results as an Arrow @see {@link ArrowTable}. */
async toArrow(): Promise<ArrowTable> {
async toArrow(options?: Partial<QueryExecutionOptions>): Promise<ArrowTable> {
const batches = [];
for await (const batch of this) {
for await (const batch of new RecordBatchIterable(this.inner, options)) {
batches.push(batch);
}
return new ArrowTable(batches);
@@ -177,9 +214,8 @@ export class QueryBase<
/** Collect the results as an array of objects. */
// biome-ignore lint/suspicious/noExplicitAny: arrow.toArrow() returns any[]
async toArray(): Promise<any[]> {
const tbl = await this.toArrow();
async toArray(options?: Partial<QueryExecutionOptions>): Promise<any[]> {
const tbl = await this.toArrow(options);
return tbl.toArray();
}
}

View File

@@ -126,6 +126,7 @@ impl Connection {
buf: Buffer,
mode: String,
storage_options: Option<HashMap<String, String>>,
use_legacy_format: Option<bool>,
) -> napi::Result<Table> {
let batches = ipc_file_to_batches(buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
@@ -136,6 +137,9 @@ impl Connection {
builder = builder.storage_option(key, value);
}
}
if let Some(use_legacy_format) = use_legacy_format {
builder = builder.use_legacy_format(use_legacy_format);
}
let tbl = builder
.execute()
.await
@@ -150,6 +154,7 @@ impl Connection {
schema_buf: Buffer,
mode: String,
storage_options: Option<HashMap<String, String>>,
use_legacy_format: Option<bool>,
) -> napi::Result<Table> {
let schema = ipc_file_to_schema(schema_buf.to_vec()).map_err(|e| {
napi::Error::from_reason(format!("Failed to marshal schema from JS to Rust: {}", e))
@@ -164,6 +169,9 @@ impl Connection {
builder = builder.storage_option(key, value);
}
}
if let Some(use_legacy_format) = use_legacy_format {
builder = builder.use_legacy_format(use_legacy_format);
}
let tbl = builder
.execute()
.await

View File

@@ -56,6 +56,7 @@ pub enum WriteMode {
/// Write options when creating a Table.
#[napi(object)]
pub struct WriteOptions {
/// Write mode for writing to a table.
pub mode: Option<WriteMode>,
}

View File

@@ -15,6 +15,7 @@
use lancedb::query::ExecutableQuery;
use lancedb::query::Query as LanceDbQuery;
use lancedb::query::QueryBase;
use lancedb::query::QueryExecutionOptions;
use lancedb::query::Select;
use lancedb::query::VectorQuery as LanceDbVectorQuery;
use napi::bindgen_prelude::*;
@@ -62,10 +63,21 @@ impl Query {
}
#[napi]
pub async fn execute(&self) -> napi::Result<RecordBatchIterator> {
let inner_stream = self.inner.execute().await.map_err(|e| {
napi::Error::from_reason(format!("Failed to execute query stream: {}", e))
})?;
pub async fn execute(
&self,
max_batch_length: Option<u32>,
) -> napi::Result<RecordBatchIterator> {
let mut execution_opts = QueryExecutionOptions::default();
if let Some(max_batch_length) = max_batch_length {
execution_opts.max_batch_length = max_batch_length;
}
let inner_stream = self
.inner
.execute_with_options(execution_opts)
.await
.map_err(|e| {
napi::Error::from_reason(format!("Failed to execute query stream: {}", e))
})?;
Ok(RecordBatchIterator::new(inner_stream))
}
}
@@ -125,10 +137,21 @@ impl VectorQuery {
}
#[napi]
pub async fn execute(&self) -> napi::Result<RecordBatchIterator> {
let inner_stream = self.inner.execute().await.map_err(|e| {
napi::Error::from_reason(format!("Failed to execute query stream: {}", e))
})?;
pub async fn execute(
&self,
max_batch_length: Option<u32>,
) -> napi::Result<RecordBatchIterator> {
let mut execution_opts = QueryExecutionOptions::default();
if let Some(max_batch_length) = max_batch_length {
execution_opts.max_batch_length = max_batch_length;
}
let inner_stream = self
.inner
.execute_with_options(execution_opts)
.await
.map_err(|e| {
napi::Error::from_reason(format!("Failed to execute query stream: {}", e))
})?;
Ok(RecordBatchIterator::new(inner_stream))
}
}

View File

@@ -3,7 +3,7 @@ name = "lancedb"
# version in Cargo.toml
dependencies = [
"deprecation",
"pylance==0.11.1",
"pylance==0.12.0",
"ratelimiter~=1.0",
"requests>=2.31.0",
"retry>=0.9.2",

View File

@@ -24,6 +24,7 @@ class Connection(object):
mode: str,
data: pa.RecordBatchReader,
storage_options: Optional[Dict[str, str]] = None,
use_legacy_format: Optional[bool] = None,
) -> Table: ...
async def create_empty_table(
self,
@@ -31,6 +32,7 @@ class Connection(object):
mode: str,
schema: pa.Schema,
storage_options: Optional[Dict[str, str]] = None,
use_legacy_format: Optional[bool] = None,
) -> Table: ...
class Table:
@@ -72,7 +74,7 @@ class Query:
def select(self, columns: Tuple[str, str]): ...
def limit(self, limit: int): ...
def nearest_to(self, query_vec: pa.Array) -> VectorQuery: ...
async def execute(self) -> RecordBatchStream: ...
async def execute(self, max_batch_legnth: Optional[int]) -> RecordBatchStream: ...
class VectorQuery:
async def execute(self) -> RecordBatchStream: ...

View File

@@ -558,6 +558,8 @@ class AsyncConnection(object):
on_bad_vectors: Optional[str] = None,
fill_value: Optional[float] = None,
storage_options: Optional[Dict[str, str]] = None,
*,
use_legacy_format: Optional[bool] = None,
) -> AsyncTable:
"""Create an [AsyncTable][lancedb.table.AsyncTable] in the database.
@@ -600,6 +602,9 @@ class AsyncConnection(object):
connection will be inherited by the table, but can be overridden here.
See available options at
https://lancedb.github.io/lancedb/guides/storage/
use_legacy_format: bool, optional, default True
If True, use the legacy format for the table. If False, use the new format.
The default is True while the new format is in beta.
Returns
@@ -761,7 +766,11 @@ class AsyncConnection(object):
if data is None:
new_table = await self._inner.create_empty_table(
name, mode, schema, storage_options=storage_options
name,
mode,
schema,
storage_options=storage_options,
use_legacy_format=use_legacy_format,
)
else:
data = data_to_reader(data, schema)
@@ -770,6 +779,7 @@ class AsyncConnection(object):
mode,
data,
storage_options=storage_options,
use_legacy_format=use_legacy_format,
)
return AsyncTable(new_table)

View File

@@ -1113,11 +1113,22 @@ class AsyncQueryBase(object):
self._inner.limit(limit)
return self
async def to_batches(self) -> AsyncRecordBatchReader:
async def to_batches(
self, *, max_batch_length: Optional[int] = None
) -> AsyncRecordBatchReader:
"""
Execute the query and return the results as an Apache Arrow RecordBatchReader.
Parameters
----------
max_batch_length: Optional[int]
The maximum number of selected records in a single RecordBatch object.
If not specified, a default batch length is used.
It is possible for batches to be smaller than the provided length if the
underlying data is stored in smaller chunks.
"""
return AsyncRecordBatchReader(await self._inner.execute())
return AsyncRecordBatchReader(await self._inner.execute(max_batch_length))
async def to_arrow(self) -> pa.Table:
"""

View File

@@ -507,6 +507,52 @@ def test_empty_or_nonexistent_table(tmp_path):
assert test.schema == test2.schema
@pytest.mark.asyncio
async def test_create_in_v2_mode(tmp_path):
def make_data():
for i in range(10):
yield pa.record_batch([pa.array([x for x in range(1024)])], names=["x"])
def make_table():
return pa.table([pa.array([x for x in range(10 * 1024)])], names=["x"])
schema = pa.schema([pa.field("x", pa.int64())])
db = await lancedb.connect_async(tmp_path)
# Create table in v1 mode
tbl = await db.create_table("test", data=make_data(), schema=schema)
async def is_in_v2_mode(tbl):
batches = await tbl.query().to_batches(max_batch_length=1024 * 10)
num_batches = 0
async for batch in batches:
num_batches += 1
return num_batches < 10
assert not await is_in_v2_mode(tbl)
# Create table in v2 mode
tbl = await db.create_table(
"test_v2", data=make_data(), schema=schema, use_legacy_format=False
)
assert await is_in_v2_mode(tbl)
# Add data (should remain in v2 mode)
await tbl.add(make_table())
assert await is_in_v2_mode(tbl)
# Create empty table in v2 mode and add data
tbl = await db.create_table(
"test_empty_v2", data=None, schema=schema, use_legacy_format=False
)
await tbl.add(make_table())
assert await is_in_v2_mode(tbl)
def test_replace_index(tmp_path):
db = lancedb.connect(uri=tmp_path)
table = db.create_table(

View File

@@ -91,6 +91,7 @@ impl Connection {
mode: &str,
data: &PyAny,
storage_options: Option<HashMap<String, String>>,
use_legacy_format: Option<bool>,
) -> PyResult<&'a PyAny> {
let inner = self_.get_inner()?.clone();
@@ -103,6 +104,10 @@ impl Connection {
builder = builder.storage_options(storage_options);
}
if let Some(use_legacy_format) = use_legacy_format {
builder = builder.use_legacy_format(use_legacy_format);
}
future_into_py(self_.py(), async move {
let table = builder.execute().await.infer_error()?;
Ok(Table::new(table))
@@ -115,6 +120,7 @@ impl Connection {
mode: &str,
schema: &PyAny,
storage_options: Option<HashMap<String, String>>,
use_legacy_format: Option<bool>,
) -> PyResult<&'a PyAny> {
let inner = self_.get_inner()?.clone();
@@ -128,6 +134,10 @@ impl Connection {
builder = builder.storage_options(storage_options);
}
if let Some(use_legacy_format) = use_legacy_format {
builder = builder.use_legacy_format(use_legacy_format);
}
future_into_py(self_.py(), async move {
let table = builder.execute().await.infer_error()?;
Ok(Table::new(table))

View File

@@ -15,6 +15,7 @@
use arrow::array::make_array;
use arrow::array::ArrayData;
use arrow::pyarrow::FromPyArrow;
use lancedb::query::QueryExecutionOptions;
use lancedb::query::{
ExecutableQuery, Query as LanceDbQuery, QueryBase, Select, VectorQuery as LanceDbVectorQuery,
};
@@ -61,10 +62,14 @@ impl Query {
Ok(VectorQuery { inner })
}
pub fn execute(self_: PyRef<'_, Self>) -> PyResult<&PyAny> {
pub fn execute(self_: PyRef<'_, Self>, max_batch_length: Option<u32>) -> PyResult<&PyAny> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let inner_stream = inner.execute().await.infer_error()?;
let mut opts = QueryExecutionOptions::default();
if let Some(max_batch_length) = max_batch_length {
opts.max_batch_length = max_batch_length;
}
let inner_stream = inner.execute_with_options(opts).await.infer_error()?;
Ok(RecordBatchStream::new(inner_stream))
})
}
@@ -115,10 +120,14 @@ impl VectorQuery {
self.inner = self.inner.clone().bypass_vector_index()
}
pub fn execute(self_: PyRef<'_, Self>) -> PyResult<&PyAny> {
pub fn execute(self_: PyRef<'_, Self>, max_batch_length: Option<u32>) -> PyResult<&PyAny> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let inner_stream = inner.execute().await.infer_error()?;
let mut opts = QueryExecutionOptions::default();
if let Some(max_batch_length) = max_batch_length {
opts.max_batch_length = max_batch_length;
}
let inner_stream = inner.execute_with_options(opts).await.infer_error()?;
Ok(RecordBatchStream::new(inner_stream))
})
}

View File

@@ -140,6 +140,7 @@ pub struct CreateTableBuilder<const HAS_DATA: bool, T: IntoArrow> {
pub(crate) write_options: WriteOptions,
pub(crate) table_definition: Option<TableDefinition>,
pub(crate) embeddings: Vec<(EmbeddingDefinition, Arc<dyn EmbeddingFunction>)>,
pub(crate) use_legacy_format: bool,
}
// Builder methods that only apply when we have initial data
@@ -153,6 +154,7 @@ impl<T: IntoArrow> CreateTableBuilder<true, T> {
write_options: WriteOptions::default(),
table_definition: None,
embeddings: Vec::new(),
use_legacy_format: true,
}
}
@@ -184,6 +186,7 @@ impl<T: IntoArrow> CreateTableBuilder<true, T> {
mode: self.mode,
write_options: self.write_options,
embeddings: self.embeddings,
use_legacy_format: self.use_legacy_format,
};
Ok((data, builder))
}
@@ -217,6 +220,7 @@ impl CreateTableBuilder<false, NoData> {
mode: CreateTableMode::default(),
write_options: WriteOptions::default(),
embeddings: Vec::new(),
use_legacy_format: false,
}
}
@@ -278,6 +282,20 @@ impl<const HAS_DATA: bool, T: IntoArrow> CreateTableBuilder<HAS_DATA, T> {
}
self
}
/// Set to true to use the v1 format for data files
///
/// This is currently defaulted to true and can be set to false to opt-in
/// to the new format. This should only be used for experimentation and
/// evaluation. The new format is still in beta and may change in ways that
/// are not backwards compatible.
///
/// Once the new format is stable, the default will change to `false` for
/// several releases and then eventually this option will be removed.
pub fn use_legacy_format(mut self, use_legacy_format: bool) -> Self {
self.use_legacy_format = use_legacy_format;
self
}
}
#[derive(Clone, Debug)]
@@ -943,6 +961,7 @@ impl ConnectionInternal for Database {
if matches!(&options.mode, CreateTableMode::Overwrite) {
write_params.mode = WriteMode::Overwrite;
}
write_params.use_legacy_format = options.use_legacy_format;
match NativeTable::create(
&table_uri,
@@ -1040,8 +1059,12 @@ impl ConnectionInternal for Database {
#[cfg(test)]
mod tests {
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
use tempfile::tempdir;
use crate::query::{ExecutableQuery, QueryExecutionOptions};
use super::*;
#[tokio::test]
@@ -1146,6 +1169,58 @@ mod tests {
assert_eq!(tables, vec!["table1".to_owned()]);
}
fn make_data() -> impl RecordBatchReader + Send + 'static {
let id = Box::new(IncrementingInt32::new().named("id".to_string()));
BatchGenerator::new().col(id).batches(10, 2000)
}
#[tokio::test]
async fn test_create_table_v2() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri).execute().await.unwrap();
let tbl = db
.create_table("v1_test", make_data())
.execute()
.await
.unwrap();
// In v1 the row group size will trump max_batch_length
let batches = tbl
.query()
.execute_with_options(QueryExecutionOptions {
max_batch_length: 50000,
})
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(batches.len(), 20);
let tbl = db
.create_table("v2_test", make_data())
.use_legacy_format(false)
.execute()
.await
.unwrap();
// In v2 the page size is much bigger than 50k so we should get a single batch
let batches = tbl
.query()
.execute_with_options(QueryExecutionOptions {
max_batch_length: 50000,
})
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(batches.len(), 1);
}
#[tokio::test]
async fn drop_table() {
let tmp_dir = tempdir().unwrap();

View File

@@ -233,7 +233,8 @@ pub struct WriteOptions {
// pub on_bad_vectors: BadVectorHandling,
/// Advanced parameters that can be used to customize table creation
///
/// If set, these will take precedence over any overlapping `OpenTableBuilder` options
/// Overlapping `OpenTableBuilder` options (e.g. [AddDataBuilder::mode]) will take
/// precedence over their counterparts in `WriteOptions` (e.g. [WriteParams::mode]).
pub lance_write_params: Option<WriteParams>,
}

View File

@@ -318,6 +318,7 @@ impl EmbeddingFunction for MockEmbed {
Ok(Arc::new(arr))
}
#[allow(unused_variables)]
fn compute_query_embeddings(&self, input: Arc<dyn Array>) -> Result<Arc<dyn Array>> {
unimplemented!()
}