test: convert test_table_names to test both remote and local (#2888)

Convert test_table_names to test both remote and local connections.

This PR also includes some miscellaneous improvements in
src/test_utils/connection.rs. It starts a thread to drain stdout from
the server process. It adds the
PRINT_LANCEDB_TEST_CONNECTION_SCRIPT_OUTPUT environment variable, which
optionally displays server stdout.

Fix a bash conditional in run_with_test_connection.sh.
This commit is contained in:
Colin Patrick McCabe
2026-01-02 15:08:44 -08:00
committed by GitHub
parent 8bcac7e372
commit ac164c352b
3 changed files with 76 additions and 33 deletions

View File

@@ -16,7 +16,7 @@ check_command_exists() {
}
if [[ ! -e ./lancedb ]]; then
if [[ -v SOPHON_READ_TOKEN ]]; then
if [[ x${SOPHON_READ_TOKEN} != "x" ]]; then
INPUT="lancedb-linux-x64"
gh release \
--repo lancedb/lancedb \

View File

@@ -1325,25 +1325,27 @@ mod tests {
#[tokio::test]
async fn test_table_names() {
let tmp_dir = tempdir().unwrap();
let tc = new_test_connection().await.unwrap();
let db = tc.connection;
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
let mut names = Vec::with_capacity(100);
for _ in 0..100 {
let mut name = uuid::Uuid::new_v4().to_string();
let name = uuid::Uuid::new_v4().to_string();
names.push(name.clone());
name.push_str(".lance");
create_dir_all(tmp_dir.path().join(&name)).unwrap();
db.create_empty_table(name, schema.clone())
.execute()
.await
.unwrap();
}
names.sort();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri).execute().await.unwrap();
let tables = db.table_names().execute().await.unwrap();
let tables = db.table_names().limit(100).execute().await.unwrap();
assert_eq!(tables, names);
let tables = db
.table_names()
.start_after(&names[30])
.limit(100)
.execute()
.await
.unwrap();

View File

@@ -5,16 +5,19 @@
use regex::Regex;
use std::env;
use std::io::{BufRead, BufReader};
use std::process::{Child, ChildStdout, Command, Stdio};
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, ChildStdout, Command};
use tokio::sync::mpsc;
use crate::{connect, Connection};
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use tempfile::{tempdir, TempDir};
pub struct TestConnection {
pub uri: String,
pub connection: Connection,
pub is_remote: bool,
_temp_dir: Option<TempDir>,
_process: Option<TestProcess>,
}
@@ -37,6 +40,56 @@ pub async fn new_test_connection() -> Result<TestConnection> {
}
}
async fn spawn_stdout_reader(
mut stdout: BufReader<ChildStdout>,
port_sender: mpsc::Sender<anyhow::Result<String>>,
) -> tokio::task::JoinHandle<()> {
let print_stdout = env::var("PRINT_LANCEDB_TEST_CONNECTION_SCRIPT_OUTPUT").is_ok();
tokio::spawn(async move {
let mut line = String::new();
let re = Regex::new(r"Query node now listening on 0.0.0.0:(.*)").unwrap();
loop {
line.clear();
let result = stdout.read_line(&mut line).await;
if let Err(err) = result {
port_sender
.send(Err(anyhow!(
"error while reading from process output: {}",
err
)))
.await
.unwrap();
return;
} else if result.unwrap() == 0 {
port_sender
.send(Err(anyhow!(
" hit EOF before reading port from process output."
)))
.await
.unwrap();
return;
}
if re.is_match(&line) {
let caps = re.captures(&line).unwrap();
port_sender.send(Ok(caps[1].to_string())).await.unwrap();
break;
}
}
loop {
line.clear();
match stdout.read_line(&mut line).await {
Err(_) => return,
Ok(0) => return,
Ok(_size) => {
if print_stdout {
print!("{}", line);
}
}
}
}
})
}
async fn new_remote_connection(script_path: &str) -> Result<TestConnection> {
let temp_dir = tempdir()?;
let data_path = temp_dir.path().to_str().unwrap().to_string();
@@ -57,38 +110,25 @@ async fn new_remote_connection(script_path: &str) -> Result<TestConnection> {
child: child_result.unwrap(),
};
let stdout = BufReader::new(process.child.stdout.take().unwrap());
let port = read_process_port(stdout)?;
let (port_sender, mut port_receiver) = mpsc::channel(5);
let _reader = spawn_stdout_reader(stdout, port_sender).await;
let port = match port_receiver.recv().await {
None => bail!("Unable to determine the port number used by the phalanx process we spawned, because the reader thread was closed too soon."),
Some(Err(err)) => bail!("Unable to determine the port number used by the phalanx process we spawned, because of an error, {}", err),
Some(Ok(port)) => port,
};
let uri = "db://test";
let host_override = format!("http://localhost:{}", port);
let connection = create_new_connection(uri, &host_override).await?;
Ok(TestConnection {
uri: uri.to_string(),
connection,
is_remote: true,
_temp_dir: Some(temp_dir),
_process: Some(process),
})
}
fn read_process_port(mut stdout: BufReader<ChildStdout>) -> Result<String> {
let mut line = String::new();
let re = Regex::new(r"Query node now listening on 0.0.0.0:(.*)").unwrap();
loop {
let result = stdout.read_line(&mut line);
if let Err(err) = result {
bail!(format!(
"read_process_port: error while reading from process output: {}",
err
));
} else if result.unwrap() == 0 {
bail!("read_process_port: hit EOF before reading port from process output.");
}
if re.is_match(&line) {
let caps = re.captures(&line).unwrap();
return Ok(caps[1].to_string());
}
}
}
#[cfg(feature = "remote")]
async fn create_new_connection(uri: &str, host_override: &str) -> crate::error::Result<Connection> {
connect(uri)
@@ -114,6 +154,7 @@ async fn new_local_connection() -> Result<TestConnection> {
Ok(TestConnection {
uri: uri.to_string(),
connection,
is_remote: false,
_temp_dir: Some(temp_dir),
_process: None,
})