diff --git a/src/servers/tests/opentsdb.rs b/src/servers/tests/opentsdb.rs index 8e04cd2640..a02f795d49 100644 --- a/src/servers/tests/opentsdb.rs +++ b/src/servers/tests/opentsdb.rs @@ -1,12 +1,13 @@ use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use common_runtime::Builder as RuntimeBuilder; use rand::rngs::StdRng; use rand::Rng; -use servers::error::{self as server_error, Result}; +use servers::error::{self as server_error, Error, Result}; use servers::opentsdb::codec::DataPoint; use servers::opentsdb::connection::Connection; use servers::opentsdb::OpentsdbServer; @@ -14,6 +15,7 @@ use servers::query_handler::OpentsdbProtocolHandler; use servers::server::Server; use tokio::net::TcpStream; use tokio::sync::mpsc; +use tokio::sync::Notify; struct DummyOpentsdbInstance { tx: mpsc::Sender, @@ -64,7 +66,7 @@ async fn test_start_opentsdb_server() -> Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_shutdown_opentsdb_server() -> Result<()> { +async fn test_shutdown_opentsdb_server_concurrently() -> Result<()> { let (tx, _) = mpsc::channel(100); let mut server = create_opentsdb_server(tx)?; let result = server.shutdown().await; @@ -76,36 +78,120 @@ async fn test_shutdown_opentsdb_server() -> Result<()> { let listening = "127.0.0.1:0".parse::().unwrap(); let addr = server.start(listening).await?; - let mut join_handles = vec![]; - for _ in 0..2 { - join_handles.push(tokio::spawn(async move { - for i in 0..1000 { - let stream = TcpStream::connect(addr).await; - match stream { - Ok(stream) => { - let mut connection = Connection::new(stream); - let result = connection.write_line(format!("put {} 1 1", i)).await; - if let Err(e) = result { - return Err(e.to_string()); + let notify = Arc::new(Notify::new()); + let notify_in_task = notify.clone(); + let stop = Arc::new(AtomicBool::new(false)); + let stop_task = stop.clone(); + let stop_timeout = Duration::from_secs(5 * 60); + let join_handle = tokio::spawn(async move { + let mut i = 1; + let mut stop_time = None; + + loop { + let stream = TcpStream::connect(addr).await; + match stream { + Ok(stream) => { + let mut connection = Connection::new(stream); + let result = connection.write_line(format!("put {} 1 1", i)).await; + i += 1; + + if i > 4 { + // Ensure the server has been started. + notify_in_task.notify_one(); + } + + if let Err(e) = result { + match e { + Error::InternalIo { .. } => return, + _ => panic!("Not IO error, err is {}", e), } } - Err(e) => return Err(e.to_string()), + + if stop.load(Ordering::Relaxed) { + let du_since_stop = stop_time.get_or_insert_with(Instant::now).elapsed(); + if du_since_stop > stop_timeout { + // Avoid hang on test. + panic!("Stop timeout"); + } + } + } + Err(_) => return, + } + } + }); + + notify.notified().await; + + server.shutdown().await.unwrap(); + + stop_task.store(true, Ordering::Relaxed); + join_handle.await.unwrap(); + + Ok(()) +} + +#[tokio::test] +async fn test_opentsdb_connection_shutdown() -> Result<()> { + let (tx, _) = mpsc::channel(100); + let mut server = create_opentsdb_server(tx)?; + let result = server.shutdown().await; + assert!(result + .unwrap_err() + .to_string() + .contains("OpenTSDB server is not started.")); + + let listening = "127.0.0.1:0".parse::().unwrap(); + let addr = server.start(listening).await?; + + let stream = TcpStream::connect(addr).await.unwrap(); + let mut connection = Connection::new(stream); + connection + .write_line("put 1 1 1".to_string()) + .await + .unwrap(); + + server.shutdown().await.unwrap(); + + let shutdown_time = Instant::now(); + let timeout = Duration::from_secs(5 * 60); + let mut i = 2; + loop { + // The connection may not be unwritable after shutdown immediately. + let result = connection.write_line(format!("put {} 1 1", i)).await; + i += 1; + if result.is_err() { + if let Err(e) = result { + match e { + Error::InternalIo { .. } => break, + _ => panic!("Not IO error, err is {}", e), } } - Ok(()) - })) + } + if shutdown_time.elapsed() > timeout { + panic!("Shutdown timeout"); + } } - tokio::time::sleep(Duration::from_millis(10)).await; + Ok(()) +} + +#[tokio::test] +async fn test_opentsdb_connect_after_shutdown() -> Result<()> { + let (tx, _) = mpsc::channel(100); + let mut server = create_opentsdb_server(tx)?; let result = server.shutdown().await; - assert!(result.is_ok()); + assert!(result + .unwrap_err() + .to_string() + .contains("OpenTSDB server is not started.")); + + let listening = "127.0.0.1:0".parse::().unwrap(); + let addr = server.start(listening).await?; + + server.shutdown().await.unwrap(); + + TcpStream::connect(addr).await.unwrap_err(); - for handle in join_handles.iter_mut() { - let result = handle.await.unwrap(); - assert!(result.is_err()); - let error = result.unwrap_err(); - assert!(error.contains("Connection refused") || error.contains("Connection reset by peer")); - } Ok(()) }