mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 22:32:55 +00:00
test(servers): Fix OpenTSDB shutdown test occasionally fails (#311)
* test(servers): OpenTSDB shutdown test cover error branch Create connection continuously to cover some branches of error handling in OpentsdbServer * test(servers): Add more tests for opentsdb server Add a test to ensure we could not connect the server after shutdown and a test to check existing connection usage after shutdown
This commit is contained in:
@@ -1,12 +1,13 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_runtime::Builder as RuntimeBuilder;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::Rng;
|
||||
use servers::error::{self as server_error, Result};
|
||||
use servers::error::{self as server_error, Error, Result};
|
||||
use servers::opentsdb::codec::DataPoint;
|
||||
use servers::opentsdb::connection::Connection;
|
||||
use servers::opentsdb::OpentsdbServer;
|
||||
@@ -14,6 +15,7 @@ use servers::query_handler::OpentsdbProtocolHandler;
|
||||
use servers::server::Server;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
struct DummyOpentsdbInstance {
|
||||
tx: mpsc::Sender<i32>,
|
||||
@@ -64,7 +66,7 @@ async fn test_start_opentsdb_server() -> Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_shutdown_opentsdb_server() -> Result<()> {
|
||||
async fn test_shutdown_opentsdb_server_concurrently() -> Result<()> {
|
||||
let (tx, _) = mpsc::channel(100);
|
||||
let mut server = create_opentsdb_server(tx)?;
|
||||
let result = server.shutdown().await;
|
||||
@@ -76,36 +78,120 @@ async fn test_shutdown_opentsdb_server() -> Result<()> {
|
||||
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
|
||||
let addr = server.start(listening).await?;
|
||||
|
||||
let mut join_handles = vec![];
|
||||
for _ in 0..2 {
|
||||
join_handles.push(tokio::spawn(async move {
|
||||
for i in 0..1000 {
|
||||
let stream = TcpStream::connect(addr).await;
|
||||
match stream {
|
||||
Ok(stream) => {
|
||||
let mut connection = Connection::new(stream);
|
||||
let result = connection.write_line(format!("put {} 1 1", i)).await;
|
||||
if let Err(e) = result {
|
||||
return Err(e.to_string());
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_in_task = notify.clone();
|
||||
let stop = Arc::new(AtomicBool::new(false));
|
||||
let stop_task = stop.clone();
|
||||
let stop_timeout = Duration::from_secs(5 * 60);
|
||||
let join_handle = tokio::spawn(async move {
|
||||
let mut i = 1;
|
||||
let mut stop_time = None;
|
||||
|
||||
loop {
|
||||
let stream = TcpStream::connect(addr).await;
|
||||
match stream {
|
||||
Ok(stream) => {
|
||||
let mut connection = Connection::new(stream);
|
||||
let result = connection.write_line(format!("put {} 1 1", i)).await;
|
||||
i += 1;
|
||||
|
||||
if i > 4 {
|
||||
// Ensure the server has been started.
|
||||
notify_in_task.notify_one();
|
||||
}
|
||||
|
||||
if let Err(e) = result {
|
||||
match e {
|
||||
Error::InternalIo { .. } => return,
|
||||
_ => panic!("Not IO error, err is {}", e),
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(e.to_string()),
|
||||
|
||||
if stop.load(Ordering::Relaxed) {
|
||||
let du_since_stop = stop_time.get_or_insert_with(Instant::now).elapsed();
|
||||
if du_since_stop > stop_timeout {
|
||||
// Avoid hang on test.
|
||||
panic!("Stop timeout");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => return,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
notify.notified().await;
|
||||
|
||||
server.shutdown().await.unwrap();
|
||||
|
||||
stop_task.store(true, Ordering::Relaxed);
|
||||
join_handle.await.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_opentsdb_connection_shutdown() -> Result<()> {
|
||||
let (tx, _) = mpsc::channel(100);
|
||||
let mut server = create_opentsdb_server(tx)?;
|
||||
let result = server.shutdown().await;
|
||||
assert!(result
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("OpenTSDB server is not started."));
|
||||
|
||||
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
|
||||
let addr = server.start(listening).await?;
|
||||
|
||||
let stream = TcpStream::connect(addr).await.unwrap();
|
||||
let mut connection = Connection::new(stream);
|
||||
connection
|
||||
.write_line("put 1 1 1".to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
server.shutdown().await.unwrap();
|
||||
|
||||
let shutdown_time = Instant::now();
|
||||
let timeout = Duration::from_secs(5 * 60);
|
||||
let mut i = 2;
|
||||
loop {
|
||||
// The connection may not be unwritable after shutdown immediately.
|
||||
let result = connection.write_line(format!("put {} 1 1", i)).await;
|
||||
i += 1;
|
||||
if result.is_err() {
|
||||
if let Err(e) = result {
|
||||
match e {
|
||||
Error::InternalIo { .. } => break,
|
||||
_ => panic!("Not IO error, err is {}", e),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
if shutdown_time.elapsed() > timeout {
|
||||
panic!("Shutdown timeout");
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_opentsdb_connect_after_shutdown() -> Result<()> {
|
||||
let (tx, _) = mpsc::channel(100);
|
||||
let mut server = create_opentsdb_server(tx)?;
|
||||
let result = server.shutdown().await;
|
||||
assert!(result.is_ok());
|
||||
assert!(result
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("OpenTSDB server is not started."));
|
||||
|
||||
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
|
||||
let addr = server.start(listening).await?;
|
||||
|
||||
server.shutdown().await.unwrap();
|
||||
|
||||
TcpStream::connect(addr).await.unwrap_err();
|
||||
|
||||
for handle in join_handles.iter_mut() {
|
||||
let result = handle.await.unwrap();
|
||||
assert!(result.is_err());
|
||||
let error = result.unwrap_err();
|
||||
assert!(error.contains("Connection refused") || error.contains("Connection reset by peer"));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user