feat: add flag to enable faster manifest paths (#1612)

The new V2 manifest path scheme makes discovering the latest version of
a table constant time on object stores, regardless of the number of
versions in the table. See benchmarks in the PR here:
https://github.com/lancedb/lance/pull/2798

Closes #1583
This commit is contained in:
Will Jones
2024-09-09 11:34:36 -07:00
committed by GitHub
parent 029b01bbbf
commit 2a6586d6fb
16 changed files with 292 additions and 2 deletions

View File

@@ -23,6 +23,7 @@ categories = ["database-implementations"]
lance = { "version" = "=0.17.0", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.17.0" }
lance-linalg = { "version" = "=0.17.0" }
lance-table = { "version" = "=0.17.0" }
lance-testing = { "version" = "=0.17.0" }
lance-datafusion = { "version" = "=0.17.0" }
lance-encoding = { "version" = "=0.17.0" }

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import { readdirSync } from "fs";
import { Field, Float64, Schema } from "apache-arrow";
import * as tmp from "tmp";
import { Connection, Table, connect } from "../lancedb";
import { LocalTable } from "../lancedb/table";
describe("when connecting", () => {
let tmpDir: tmp.DirResult;
@@ -134,4 +136,57 @@ describe("given a connection", () => {
await table.add(data);
await expect(isV2(table)).resolves.toBe(true);
});
it("should be able to create tables with V2 manifest paths", async () => {
const db = await connect(tmpDir.name);
let table = (await db.createEmptyTable(
"test_manifest_paths_v2_empty",
new Schema([new Field("id", new Float64(), true)]),
{
enableV2ManifestPaths: true,
},
)) as LocalTable;
expect(await table.usesV2ManifestPaths()).toBe(true);
let manifestDir =
tmpDir.name + "/test_manifest_paths_v2_empty.lance/_versions";
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
table = (await db.createTable("test_manifest_paths_v2", [{ id: 1 }], {
enableV2ManifestPaths: true,
})) as LocalTable;
expect(await table.usesV2ManifestPaths()).toBe(true);
manifestDir = tmpDir.name + "/test_manifest_paths_v2.lance/_versions";
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
});
it("should be able to migrate tables to the V2 manifest paths", async () => {
const db = await connect(tmpDir.name);
const table = (await db.createEmptyTable(
"test_manifest_path_migration",
new Schema([new Field("id", new Float64(), true)]),
{
enableV2ManifestPaths: false,
},
)) as LocalTable;
expect(await table.usesV2ManifestPaths()).toBe(false);
const manifestDir =
tmpDir.name + "/test_manifest_path_migration.lance/_versions";
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d\.manifest$/);
});
await table.migrateManifestPathsV2();
expect(await table.usesV2ManifestPaths()).toBe(true);
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
});
});

View File

@@ -52,6 +52,15 @@ export interface CreateTableOptions {
*/
dataStorageVersion?: string;
/**
* Use the new V2 manifest paths. These paths provide more efficient
* opening of datasets with many versions on object stores. WARNING:
* turning this on will make the dataset unreadable for older versions
* of LanceDB (prior to 0.10.0). To migrate an existing dataset, instead
* use the {@link LocalTable#migrateManifestPathsV2} method.
*/
enableV2ManifestPaths?: boolean;
/**
* If true then data files will be written with the legacy format
*
@@ -270,6 +279,7 @@ export class LocalConnection extends Connection {
mode,
cleanseStorageOptions(options?.storageOptions),
dataStorageVersion,
options?.enableV2ManifestPaths,
);
return new LocalTable(innerTable);
@@ -308,6 +318,7 @@ export class LocalConnection extends Connection {
mode,
cleanseStorageOptions(options?.storageOptions),
dataStorageVersion,
options?.enableV2ManifestPaths,
);
return new LocalTable(innerTable);
}

View File

@@ -697,4 +697,31 @@ export class LocalTable extends Table {
on = Array.isArray(on) ? on : [on];
return new MergeInsertBuilder(this.inner.mergeInsert(on));
}
/**
* Check if the table uses the new manifest path scheme.
*
* This function will return true if the table uses the V2 manifest
* path scheme.
*/
async usesV2ManifestPaths(): Promise<boolean> {
return await this.inner.usesV2ManifestPaths();
}
/**
* Migrate the table to use the new manifest path scheme.
*
* This function will rename all V1 manifests to V2 manifest paths.
* These paths provide more efficient opening of datasets with many versions
* on object stores.
*
* This function is idempotent, and can be run multiple times without
* changing the state of the object store.
*
* However, it should not be run while other concurrent operations are happening.
* And it should also run until completion before resuming other operations.
*/
async migrateManifestPathsV2(): Promise<void> {
await this.inner.migrateManifestPathsV2();
}
}

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.8.0",
"version": "0.10.0-beta.1",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.8.0",
"version": "0.10.0-beta.1",
"cpu": [
"x64",
"arm64"

View File

@@ -124,6 +124,7 @@ impl Connection {
mode: String,
storage_options: Option<HashMap<String, String>>,
data_storage_options: Option<String>,
enable_v2_manifest_paths: Option<bool>,
) -> napi::Result<Table> {
let batches = ipc_file_to_batches(buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
@@ -140,6 +141,9 @@ impl Connection {
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?,
);
}
if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths {
builder = builder.enable_v2_manifest_paths(enable_v2_manifest_paths);
}
let tbl = builder
.execute()
.await
@@ -155,6 +159,7 @@ impl Connection {
mode: String,
storage_options: Option<HashMap<String, String>>,
data_storage_options: Option<String>,
enable_v2_manifest_paths: Option<bool>,
) -> napi::Result<Table> {
let schema = ipc_file_to_schema(schema_buf.to_vec()).map_err(|e| {
napi::Error::from_reason(format!("Failed to marshal schema from JS to Rust: {}", e))
@@ -175,6 +180,9 @@ impl Connection {
.map_err(|e| napi::Error::from_reason(format!("{}", e)))?,
);
}
if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths {
builder = builder.enable_v2_manifest_paths(enable_v2_manifest_paths);
}
let tbl = builder
.execute()
.await

View File

@@ -347,6 +347,26 @@ impl Table {
let on: Vec<_> = on.iter().map(String::as_str).collect();
Ok(self.inner_ref()?.merge_insert(on.as_slice()).into())
}
#[napi(catch_unwind)]
pub async fn uses_v2_manifest_paths(&self) -> napi::Result<bool> {
self.inner_ref()?
.as_native()
.ok_or_else(|| napi::Error::from_reason("This cannot be run on a remote table"))?
.uses_v2_manifest_paths()
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn migrate_manifest_paths_v2(&self) -> napi::Result<()> {
self.inner_ref()?
.as_native()
.ok_or_else(|| napi::Error::from_reason("This cannot be run on a remote table"))?
.migrate_manifest_paths_v2()
.await
.default_error()
}
}
#[napi(object)]

View File

@@ -25,6 +25,7 @@ class Connection(object):
data: pa.RecordBatchReader,
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> Table: ...
async def create_empty_table(
self,
@@ -33,6 +34,7 @@ class Connection(object):
schema: pa.Schema,
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> Table: ...
class Table:

View File

@@ -567,6 +567,7 @@ class AsyncConnection(object):
*,
data_storage_version: Optional[str] = None,
use_legacy_format: Optional[bool] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> AsyncTable:
"""Create an [AsyncTable][lancedb.table.AsyncTable] in the database.
@@ -618,6 +619,14 @@ class AsyncConnection(object):
If True, use the legacy format for the table. If False, use the new format.
The default is True while the new format is in beta.
This method is deprecated, use `data_storage_version` instead.
enable_v2_manifest_paths: bool, optional, default False
Use the new V2 manifest paths. These paths provide more efficient
opening of datasets with many versions on object stores. WARNING:
turning this on will make the dataset unreadable for older versions
of LanceDB (prior to 0.13.0). To migrate an existing dataset, instead
use the
[AsyncTable.migrate_manifest_paths_v2][lancedb.table.AsyncTable.migrate_manifest_paths_v2]
method.
Returns
@@ -761,6 +770,7 @@ class AsyncConnection(object):
schema,
storage_options=storage_options,
data_storage_version=data_storage_version,
enable_v2_manifest_paths=enable_v2_manifest_paths,
)
else:
data = data_to_reader(data, schema)
@@ -770,6 +780,7 @@ class AsyncConnection(object):
data,
storage_options=storage_options,
data_storage_version=data_storage_version,
enable_v2_manifest_paths=enable_v2_manifest_paths,
)
return AsyncTable(new_table)

View File

@@ -2539,3 +2539,34 @@ class AsyncTable:
List all indices that have been created with Self::create_index
"""
return await self._inner.list_indices()
async def uses_v2_manifest_paths(self) -> bool:
"""
Check if the table is using the new v2 manifest paths.
Returns
-------
bool
True if the table is using the new v2 manifest paths, False otherwise.
"""
return await self._inner.uses_v2_manifest_paths()
async def migrate_manifest_paths_v2(self):
"""
Migrate the manifest paths to the new format.
This will update the manifest to use the new v2 format for paths.
This function is idempotent, and can be run multiple times without
changing the state of the object store.
!!! danger
This should not be run while other concurrent operations are happening.
And it should also run until completion before resuming other operations.
You can use
[AsyncTable.uses_v2_manifest_paths][lancedb.table.AsyncTable.uses_v2_manifest_paths]
to check if the table is already using the new path style.
"""
await self._inner.migrate_manifest_paths_v2()

View File

@@ -13,6 +13,7 @@
import re
from datetime import timedelta
import os
import lancedb
import numpy as np
@@ -413,6 +414,40 @@ async def test_create_exist_ok_async(tmp_path):
# await db.create_table("test", schema=bad_schema, exist_ok=True)
@pytest.mark.asyncio
async def test_create_table_v2_manifest_paths_async(tmp_path):
db = await lancedb.connect_async(tmp_path)
# Create table in v2 mode with v2 manifest paths enabled
tbl = await db.create_table(
"test_v2_manifest_paths",
data=[{"id": 0}],
use_legacy_format=False,
enable_v2_manifest_paths=True,
)
assert await tbl.uses_v2_manifest_paths()
manifests_dir = tmp_path / "test_v2_manifest_paths.lance" / "_versions"
for manifest in os.listdir(manifests_dir):
assert re.match(r"\d{20}\.manifest", manifest)
# Start a table in V1 mode then migrate
tbl = await db.create_table(
"test_v2_migration",
data=[{"id": 0}],
use_legacy_format=False,
enable_v2_manifest_paths=False,
)
assert not await tbl.uses_v2_manifest_paths()
manifests_dir = tmp_path / "test_v2_migration.lance" / "_versions"
for manifest in os.listdir(manifests_dir):
assert re.match(r"\d\.manifest", manifest)
await tbl.migrate_manifest_paths_v2()
assert await tbl.uses_v2_manifest_paths()
for manifest in os.listdir(manifests_dir):
assert re.match(r"\d{20}\.manifest", manifest)
def test_open_table_sync(tmp_path):
db = lancedb.connect(tmp_path)
db.create_table("test", data=[{"id": 0}])

View File

@@ -81,6 +81,7 @@ impl Connection {
data: Bound<'_, PyAny>,
storage_options: Option<HashMap<String, String>>,
data_storage_version: Option<String>,
enable_v2_manifest_paths: Option<bool>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.get_inner()?.clone();
@@ -93,6 +94,10 @@ impl Connection {
builder = builder.storage_options(storage_options);
}
if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths {
builder = builder.enable_v2_manifest_paths(enable_v2_manifest_paths);
}
if let Some(data_storage_version) = data_storage_version.as_ref() {
builder = builder.data_storage_version(
LanceFileVersion::from_str(data_storage_version)
@@ -113,6 +118,7 @@ impl Connection {
schema: Bound<'_, PyAny>,
storage_options: Option<HashMap<String, String>>,
data_storage_version: Option<String>,
enable_v2_manifest_paths: Option<bool>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.get_inner()?.clone();
@@ -126,6 +132,10 @@ impl Connection {
builder = builder.storage_options(storage_options);
}
if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths {
builder = builder.enable_v2_manifest_paths(enable_v2_manifest_paths);
}
if let Some(data_storage_version) = data_storage_version.as_ref() {
builder = builder.data_storage_version(
LanceFileVersion::from_str(data_storage_version)

View File

@@ -303,4 +303,28 @@ impl Table {
})
})
}
pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner
.as_native()
.ok_or_else(|| PyValueError::new_err("This cannot be run on a remote table"))?
.uses_v2_manifest_paths()
.await
.infer_error()
})
}
pub fn migrate_manifest_paths_v2(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner
.as_native()
.ok_or_else(|| PyValueError::new_err("This cannot be run on a remote table"))?
.migrate_manifest_paths_v2()
.await
.infer_error()
})
}
}

View File

@@ -27,6 +27,7 @@ lazy_static.workspace = true
lance = { workspace = true }
lance-datafusion.workspace = true
lance-index = { workspace = true }
lance-table = { workspace = true }
lance-linalg = { workspace = true }
lance-testing = { workspace = true }
lance-encoding = { workspace = true }

View File

@@ -142,6 +142,7 @@ pub struct CreateTableBuilder<const HAS_DATA: bool, T: IntoArrow> {
pub(crate) table_definition: Option<TableDefinition>,
pub(crate) embeddings: Vec<(EmbeddingDefinition, Arc<dyn EmbeddingFunction>)>,
pub(crate) data_storage_version: Option<LanceFileVersion>,
pub(crate) enable_v2_manifest_paths: Option<bool>,
}
// Builder methods that only apply when we have initial data
@@ -156,6 +157,7 @@ impl<T: IntoArrow> CreateTableBuilder<true, T> {
table_definition: None,
embeddings: Vec::new(),
data_storage_version: None,
enable_v2_manifest_paths: None,
}
}
@@ -188,6 +190,7 @@ impl<T: IntoArrow> CreateTableBuilder<true, T> {
write_options: self.write_options,
embeddings: self.embeddings,
data_storage_version: self.data_storage_version,
enable_v2_manifest_paths: self.enable_v2_manifest_paths,
};
Ok((data, builder))
}
@@ -222,6 +225,7 @@ impl CreateTableBuilder<false, NoData> {
write_options: WriteOptions::default(),
embeddings: Vec::new(),
data_storage_version: None,
enable_v2_manifest_paths: None,
}
}
@@ -284,6 +288,23 @@ impl<const HAS_DATA: bool, T: IntoArrow> CreateTableBuilder<HAS_DATA, T> {
self
}
/// Set whether to use V2 manifest paths for the table. (default: false)
///
/// These paths provide more efficient opening of tables with many
/// versions on object stores.
///
/// <div class="warning">Turning this on will make the dataset unreadable
/// for older versions of LanceDB (prior to 0.10.0).</div>
///
/// To migrate an existing dataset, instead use the
/// [[NativeTable::migrate_manifest_paths_v2]].
///
/// This has no effect in LanceDB Cloud.
pub fn enable_v2_manifest_paths(mut self, use_v2_manifest_paths: bool) -> Self {
self.enable_v2_manifest_paths = Some(use_v2_manifest_paths);
self
}
/// Set the data storage version.
///
/// The default is `LanceFileVersion::Legacy`.
@@ -976,7 +997,10 @@ impl ConnectionInternal for Database {
if matches!(&options.mode, CreateTableMode::Overwrite) {
write_params.mode = WriteMode::Overwrite;
}
write_params.data_storage_version = options.data_storage_version;
write_params.enable_v2_manifest_paths =
options.enable_v2_manifest_paths.unwrap_or_default();
match NativeTable::create(
&table_uri,

View File

@@ -43,6 +43,7 @@ use lance_index::vector::pq::PQBuildParams;
use lance_index::vector::sq::builder::SQBuildParams;
use lance_index::DatasetIndexExt;
use lance_index::IndexType;
use lance_table::io::commit::ManifestNamingScheme;
use log::info;
use serde::{Deserialize, Serialize};
use snafu::whatever;
@@ -1646,6 +1647,35 @@ impl NativeTable {
Default::default(),
)?))
}
/// Check whether the table uses V2 manifest paths.
///
/// See [Self::migrate_manifest_paths_v2] and [ManifestNamingScheme] for
/// more information.
pub async fn uses_v2_manifest_paths(&self) -> Result<bool> {
let dataset = self.dataset.get().await?;
Ok(dataset.manifest_naming_scheme == ManifestNamingScheme::V2)
}
/// Migrate the table to use the new manifest path scheme.
///
/// This function will rename all V1 manifests to V2 manifest paths.
/// These paths provide more efficient opening of datasets with many versions
/// on object stores.
///
/// This function is idempotent, and can be run multiple times without
/// changing the state of the object store.
///
/// However, it should not be run while other concurrent operations are happening.
/// And it should also run until completion before resuming other operations.
///
/// You can use [Self::uses_v2_manifest_paths] to check if the table is already
/// using V2 manifest paths.
pub async fn migrate_manifest_paths_v2(&self) -> Result<()> {
let mut dataset = self.dataset.get_mut().await?;
dataset.migrate_manifest_paths_v2().await?;
Ok(())
}
}
#[async_trait::async_trait]