diff --git a/ci/run_with_test_connection.sh b/ci/run_with_test_connection.sh index b0cf68a7..0f0cd691 100755 --- a/ci/run_with_test_connection.sh +++ b/ci/run_with_test_connection.sh @@ -16,7 +16,7 @@ check_command_exists() { } if [[ ! -e ./lancedb ]]; then - if [[ -v SOPHON_READ_TOKEN ]]; then + if [[ x${SOPHON_READ_TOKEN} != "x" ]]; then INPUT="lancedb-linux-x64" gh release \ --repo lancedb/lancedb \ diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index b086b76f..7743e339 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -1325,25 +1325,27 @@ mod tests { #[tokio::test] async fn test_table_names() { - let tmp_dir = tempdir().unwrap(); + let tc = new_test_connection().await.unwrap(); + let db = tc.connection; + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); let mut names = Vec::with_capacity(100); for _ in 0..100 { - let mut name = uuid::Uuid::new_v4().to_string(); + let name = uuid::Uuid::new_v4().to_string(); names.push(name.clone()); - name.push_str(".lance"); - create_dir_all(tmp_dir.path().join(&name)).unwrap(); + db.create_empty_table(name, schema.clone()) + .execute() + .await + .unwrap(); } names.sort(); - - let uri = tmp_dir.path().to_str().unwrap(); - let db = connect(uri).execute().await.unwrap(); - let tables = db.table_names().execute().await.unwrap(); + let tables = db.table_names().limit(100).execute().await.unwrap(); assert_eq!(tables, names); let tables = db .table_names() .start_after(&names[30]) + .limit(100) .execute() .await .unwrap(); diff --git a/rust/lancedb/src/test_utils/connection.rs b/rust/lancedb/src/test_utils/connection.rs index 2811f05f..c5c3294e 100644 --- a/rust/lancedb/src/test_utils/connection.rs +++ b/rust/lancedb/src/test_utils/connection.rs @@ -5,16 +5,19 @@ use regex::Regex; use std::env; -use std::io::{BufRead, BufReader}; -use std::process::{Child, ChildStdout, Command, Stdio}; +use std::process::Stdio; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::{Child, ChildStdout, Command}; +use tokio::sync::mpsc; use crate::{connect, Connection}; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use tempfile::{tempdir, TempDir}; pub struct TestConnection { pub uri: String, pub connection: Connection, + pub is_remote: bool, _temp_dir: Option, _process: Option, } @@ -37,6 +40,56 @@ pub async fn new_test_connection() -> Result { } } +async fn spawn_stdout_reader( + mut stdout: BufReader, + port_sender: mpsc::Sender>, +) -> tokio::task::JoinHandle<()> { + let print_stdout = env::var("PRINT_LANCEDB_TEST_CONNECTION_SCRIPT_OUTPUT").is_ok(); + tokio::spawn(async move { + let mut line = String::new(); + let re = Regex::new(r"Query node now listening on 0.0.0.0:(.*)").unwrap(); + loop { + line.clear(); + let result = stdout.read_line(&mut line).await; + if let Err(err) = result { + port_sender + .send(Err(anyhow!( + "error while reading from process output: {}", + err + ))) + .await + .unwrap(); + return; + } else if result.unwrap() == 0 { + port_sender + .send(Err(anyhow!( + " hit EOF before reading port from process output." + ))) + .await + .unwrap(); + return; + } + if re.is_match(&line) { + let caps = re.captures(&line).unwrap(); + port_sender.send(Ok(caps[1].to_string())).await.unwrap(); + break; + } + } + loop { + line.clear(); + match stdout.read_line(&mut line).await { + Err(_) => return, + Ok(0) => return, + Ok(_size) => { + if print_stdout { + print!("{}", line); + } + } + } + } + }) +} + async fn new_remote_connection(script_path: &str) -> Result { let temp_dir = tempdir()?; let data_path = temp_dir.path().to_str().unwrap().to_string(); @@ -57,38 +110,25 @@ async fn new_remote_connection(script_path: &str) -> Result { child: child_result.unwrap(), }; let stdout = BufReader::new(process.child.stdout.take().unwrap()); - let port = read_process_port(stdout)?; + let (port_sender, mut port_receiver) = mpsc::channel(5); + let _reader = spawn_stdout_reader(stdout, port_sender).await; + let port = match port_receiver.recv().await { + None => bail!("Unable to determine the port number used by the phalanx process we spawned, because the reader thread was closed too soon."), + Some(Err(err)) => bail!("Unable to determine the port number used by the phalanx process we spawned, because of an error, {}", err), + Some(Ok(port)) => port, + }; let uri = "db://test"; let host_override = format!("http://localhost:{}", port); let connection = create_new_connection(uri, &host_override).await?; Ok(TestConnection { uri: uri.to_string(), connection, + is_remote: true, _temp_dir: Some(temp_dir), _process: Some(process), }) } -fn read_process_port(mut stdout: BufReader) -> Result { - let mut line = String::new(); - let re = Regex::new(r"Query node now listening on 0.0.0.0:(.*)").unwrap(); - loop { - let result = stdout.read_line(&mut line); - if let Err(err) = result { - bail!(format!( - "read_process_port: error while reading from process output: {}", - err - )); - } else if result.unwrap() == 0 { - bail!("read_process_port: hit EOF before reading port from process output."); - } - if re.is_match(&line) { - let caps = re.captures(&line).unwrap(); - return Ok(caps[1].to_string()); - } - } -} - #[cfg(feature = "remote")] async fn create_new_connection(uri: &str, host_override: &str) -> crate::error::Result { connect(uri) @@ -114,6 +154,7 @@ async fn new_local_connection() -> Result { Ok(TestConnection { uri: uri.to_string(), connection, + is_remote: false, _temp_dir: Some(temp_dir), _process: None, })