mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-20 13:30:41 +00:00
feat: add support for test_remote_connections (#2666)
Add a new test feature which allows for running the lancedb tests against a remote server. Convert over a few tests in src/connection.rs as a proof of concept. To make local development easier, the remote tests can be run locally from a Makefile. This file can also be used to run the feature tests, with a single invocation of 'make'. (The feature tests require bringing up a docker compose environment.)
This commit is contained in:
committed by
GitHub
parent
13c613d45f
commit
82b25a71e9
@@ -82,6 +82,7 @@ crunchy.workspace = true
|
||||
bytemuck_derive.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
anyhow = "1"
|
||||
tempfile = "3.5.0"
|
||||
rand = { version = "0.9", features = ["small_rng"] }
|
||||
random_word = { version = "0.4.3", features = ["en"] }
|
||||
|
||||
19
rust/lancedb/Makefile
Normal file
19
rust/lancedb/Makefile
Normal file
@@ -0,0 +1,19 @@
|
||||
#
|
||||
# Makefile for running tests.
|
||||
#
|
||||
|
||||
# Run all tests.
|
||||
all-tests: feature-tests remote-tests
|
||||
|
||||
# Run tests for every feature. This requires using docker compose to set up
|
||||
# the environment.
|
||||
feature-tests:
|
||||
../../ci/run_with_docker_compose.sh \
|
||||
cargo test --all-features --tests --locked --examples
|
||||
.PHONY: feature-tests
|
||||
|
||||
# Run tests against remote endpoints.
|
||||
remote-tests:
|
||||
../../ci/run_with_test_connection.sh \
|
||||
cargo test --features remote --locked
|
||||
.PHONY: remote-tests
|
||||
@@ -1170,6 +1170,7 @@ mod tests {
|
||||
use crate::database::listing::{ListingDatabaseOptions, NewTableConfig};
|
||||
use crate::query::QueryBase;
|
||||
use crate::query::{ExecutableQuery, QueryExecutionOptions};
|
||||
use crate::test_connection::test_utils::new_test_connection;
|
||||
use arrow::compute::concat_batches;
|
||||
use arrow_array::RecordBatchReader;
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
@@ -1185,11 +1186,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_connect() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let db = connect(uri).execute().await.unwrap();
|
||||
|
||||
assert_eq!(db.uri, uri);
|
||||
let tc = new_test_connection().await.unwrap();
|
||||
assert_eq!(tc.connection.uri, tc.uri);
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
@@ -1255,16 +1253,10 @@ mod tests {
|
||||
assert_eq!(tables, names[..7]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_connect_s3() {
|
||||
// let db = Database::connect("s3://bucket/path/to/database").await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_table() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let db = connect(uri).execute().await.unwrap();
|
||||
let tc = new_test_connection().await.unwrap();
|
||||
let db = tc.connection;
|
||||
|
||||
assert_eq!(db.table_names().execute().await.unwrap().len(), 0);
|
||||
// open non-exist table
|
||||
|
||||
@@ -206,6 +206,7 @@ pub mod query;
|
||||
pub mod remote;
|
||||
pub mod rerankers;
|
||||
pub mod table;
|
||||
pub mod test_connection;
|
||||
pub mod utils;
|
||||
|
||||
use std::fmt::Display;
|
||||
|
||||
126
rust/lancedb/src/test_connection.rs
Normal file
126
rust/lancedb/src/test_connection.rs
Normal file
@@ -0,0 +1,126 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! Functions for testing connections.
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test_utils {
|
||||
use regex::Regex;
|
||||
use std::env;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::process::{Child, ChildStdout, Command, Stdio};
|
||||
|
||||
use crate::{connect, Connection};
|
||||
use anyhow::{bail, Result};
|
||||
use tempfile::{tempdir, TempDir};
|
||||
|
||||
pub struct TestConnection {
|
||||
pub uri: String,
|
||||
pub connection: Connection,
|
||||
_temp_dir: Option<TempDir>,
|
||||
_process: Option<TestProcess>,
|
||||
}
|
||||
|
||||
struct TestProcess {
|
||||
child: Child,
|
||||
}
|
||||
|
||||
impl Drop for TestProcess {
|
||||
#[allow(unused_must_use)]
|
||||
fn drop(&mut self) {
|
||||
self.child.kill();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn new_test_connection() -> Result<TestConnection> {
|
||||
match env::var("CREATE_LANCEDB_TEST_CONNECTION_SCRIPT") {
|
||||
Ok(script_path) => new_remote_connection(&script_path).await,
|
||||
Err(_e) => new_local_connection().await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn new_remote_connection(script_path: &str) -> Result<TestConnection> {
|
||||
let temp_dir = tempdir()?;
|
||||
let data_path = temp_dir.path().to_str().unwrap().to_string();
|
||||
let child_result = Command::new(script_path)
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.arg(data_path.clone())
|
||||
.spawn();
|
||||
if child_result.is_err() {
|
||||
bail!(format!(
|
||||
"Unable to run {}: {:?}",
|
||||
script_path,
|
||||
child_result.err()
|
||||
));
|
||||
}
|
||||
let mut process = TestProcess {
|
||||
child: child_result.unwrap(),
|
||||
};
|
||||
let stdout = BufReader::new(process.child.stdout.take().unwrap());
|
||||
let port = read_process_port(stdout)?;
|
||||
let uri = "db://test";
|
||||
let host_override = format!("http://localhost:{}", port);
|
||||
let connection = create_new_connection(uri, &host_override).await?;
|
||||
Ok(TestConnection {
|
||||
uri: uri.to_string(),
|
||||
connection,
|
||||
_temp_dir: Some(temp_dir),
|
||||
_process: Some(process),
|
||||
})
|
||||
}
|
||||
|
||||
fn read_process_port(mut stdout: BufReader<ChildStdout>) -> Result<String> {
|
||||
let mut line = String::new();
|
||||
let re = Regex::new(r"Query node now listening on 0.0.0.0:(.*)").unwrap();
|
||||
loop {
|
||||
let result = stdout.read_line(&mut line);
|
||||
if let Err(err) = result {
|
||||
bail!(format!(
|
||||
"read_process_port: error while reading from process output: {}",
|
||||
err
|
||||
));
|
||||
} else if result.unwrap() == 0 {
|
||||
bail!("read_process_port: hit EOF before reading port from process output.");
|
||||
}
|
||||
if re.is_match(&line) {
|
||||
let caps = re.captures(&line).unwrap();
|
||||
return Ok(caps[1].to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "remote")]
|
||||
async fn create_new_connection(
|
||||
uri: &str,
|
||||
host_override: &str,
|
||||
) -> crate::error::Result<Connection> {
|
||||
connect(uri)
|
||||
.region("us-east-1")
|
||||
.api_key("sk_localtest")
|
||||
.host_override(host_override)
|
||||
.execute()
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "remote"))]
|
||||
async fn create_new_connection(
|
||||
_uri: &str,
|
||||
_host_override: &str,
|
||||
) -> crate::error::Result<Connection> {
|
||||
panic!("remote feature not supported");
|
||||
}
|
||||
|
||||
async fn new_local_connection() -> Result<TestConnection> {
|
||||
let temp_dir = tempdir()?;
|
||||
let uri = temp_dir.path().to_str().unwrap();
|
||||
let connection = connect(uri).execute().await?;
|
||||
Ok(TestConnection {
|
||||
uri: uri.to_string(),
|
||||
connection,
|
||||
_temp_dir: Some(temp_dir),
|
||||
_process: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user