refactor!: remove opentsdb tcp server (#3828)

* refactor: remove opentsdb tcp server

* refactor: remove config and add test

* refactor: update docs and remove unused code
This commit is contained in:
shuiyisong
2024-05-06 14:42:05 +08:00
committed by GitHub
parent 6e12e1b84b
commit 6e9e8fad26
17 changed files with 15 additions and 953 deletions

View File

@@ -33,9 +33,7 @@
| `postgres.tls.key_path` | String | `None` | Private key file path. |
| `postgres.tls.watch` | Bool | `false` | Watch for Certificate and key file change and auto reload |
| `opentsdb` | -- | -- | OpenTSDB protocol options. |
| `opentsdb.enable` | Bool | `true` | Whether to enable |
| `opentsdb.addr` | String | `127.0.0.1:4242` | OpenTSDB telnet API server address. |
| `opentsdb.runtime_size` | Integer | `2` | The number of server worker threads. |
| `opentsdb.enable` | Bool | `true` | Whether to enable OpenTSDB put in HTTP API. |
| `influxdb` | -- | -- | InfluxDB protocol options. |
| `influxdb.enable` | Bool | `true` | Whether to enable InfluxDB protocol in HTTP API. |
| `prom_store` | -- | -- | Prometheus remote storage options |
@@ -168,9 +166,7 @@
| `postgres.tls.key_path` | String | `None` | Private key file path. |
| `postgres.tls.watch` | Bool | `false` | Watch for Certificate and key file change and auto reload |
| `opentsdb` | -- | -- | OpenTSDB protocol options. |
| `opentsdb.enable` | Bool | `true` | Whether to enable |
| `opentsdb.addr` | String | `127.0.0.1:4242` | OpenTSDB telnet API server address. |
| `opentsdb.runtime_size` | Integer | `2` | The number of server worker threads. |
| `opentsdb.enable` | Bool | `true` | Whether to enable OpenTSDB put in HTTP API. |
| `influxdb` | -- | -- | InfluxDB protocol options. |
| `influxdb.enable` | Bool | `true` | Whether to enable InfluxDB protocol in HTTP API. |
| `prom_store` | -- | -- | Prometheus remote storage options |

View File

@@ -88,12 +88,8 @@ watch = false
## OpenTSDB protocol options.
[opentsdb]
## Whether to enable
## Whether to enable OpenTSDB put in HTTP API.
enable = true
## OpenTSDB telnet API server address.
addr = "127.0.0.1:4242"
## The number of server worker threads.
runtime_size = 2
## InfluxDB protocol options.
[influxdb]

View File

@@ -83,12 +83,8 @@ watch = false
## OpenTSDB protocol options.
[opentsdb]
## Whether to enable
## Whether to enable OpenTSDB put in HTTP API.
enable = true
## OpenTSDB telnet API server address.
addr = "127.0.0.1:4242"
## The number of server worker threads.
runtime_size = 2
## InfluxDB protocol options.
[influxdb]

View File

@@ -126,8 +126,6 @@ pub struct StartCommand {
mysql_addr: Option<String>,
#[clap(long)]
postgres_addr: Option<String>,
#[clap(long)]
opentsdb_addr: Option<String>,
#[clap(short, long)]
config_file: Option<String>,
#[clap(short, long)]
@@ -198,11 +196,6 @@ impl StartCommand {
opts.postgres.tls = tls_opts;
}
if let Some(addr) = &self.opentsdb_addr {
opts.opentsdb.enable = true;
opts.opentsdb.addr.clone_from(addr);
}
if let Some(enable) = self.influxdb_enable {
opts.influxdb.enable = enable;
}
@@ -319,7 +312,6 @@ mod tests {
http_addr: Some("127.0.0.1:1234".to_string()),
mysql_addr: Some("127.0.0.1:5678".to_string()),
postgres_addr: Some("127.0.0.1:5432".to_string()),
opentsdb_addr: Some("127.0.0.1:4321".to_string()),
influxdb_enable: Some(false),
disable_dashboard: Some(false),
..Default::default()
@@ -333,7 +325,6 @@ mod tests {
assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
assert_eq!(opts.opentsdb.addr, "127.0.0.1:4321");
let default_opts = FrontendOptions::default();
@@ -346,10 +337,6 @@ mod tests {
default_opts.postgres.runtime_size
);
assert!(opts.opentsdb.enable);
assert_eq!(
opts.opentsdb.runtime_size,
default_opts.opentsdb.runtime_size
);
assert!(!opts.influxdb.enable);
}
@@ -365,6 +352,9 @@ mod tests {
timeout = "30s"
body_limit = "2GB"
[opentsdb]
enable = false
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
@@ -389,6 +379,7 @@ mod tests {
assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), fe_opts.logging.dir);
assert!(!fe_opts.opentsdb.enable);
}
#[tokio::test]

View File

@@ -256,8 +256,6 @@ pub struct StartCommand {
mysql_addr: Option<String>,
#[clap(long)]
postgres_addr: Option<String>,
#[clap(long)]
opentsdb_addr: Option<String>,
#[clap(short, long)]
influxdb_enable: bool,
#[clap(short, long)]
@@ -343,11 +341,6 @@ impl StartCommand {
opts.postgres.tls = tls_opts;
}
if let Some(addr) = &self.opentsdb_addr {
opts.opentsdb.enable = true;
opts.opentsdb.addr.clone_from(addr);
}
if self.influxdb_enable {
opts.influxdb.enable = self.influxdb_enable;
}
@@ -610,6 +603,9 @@ mod tests {
timeout = "33s"
body_limit = "128MB"
[opentsdb]
enable = true
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
@@ -637,6 +633,7 @@ mod tests {
assert_eq!(2, fe_opts.mysql.runtime_size);
assert_eq!(None, fe_opts.mysql.reject_no_database);
assert!(fe_opts.influxdb.enable);
assert!(fe_opts.opentsdb.enable);
let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
unreachable!()

View File

@@ -24,7 +24,6 @@ use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
@@ -258,24 +257,6 @@ where
handlers.insert((pg_server, pg_addr)).await;
}
if opts.opentsdb.enable {
// Init OpenTSDB server
let opts = &opts.opentsdb;
let addr = parse_addr(&opts.addr)?;
let io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.runtime_size)
.thread_name("opentsdb-io-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
);
let server = OpentsdbServer::create_server(instance.clone(), io_runtime);
handlers.insert((server, addr)).await;
}
Ok(handlers)
}
}

View File

@@ -17,16 +17,10 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct OpentsdbOptions {
pub enable: bool,
pub addr: String,
pub runtime_size: usize,
}
impl Default for OpentsdbOptions {
fn default() -> Self {
Self {
enable: true,
addr: "127.0.0.1:4242".to_string(),
runtime_size: 2,
}
Self { enable: true }
}
}

View File

@@ -190,13 +190,6 @@ pub enum Error {
error: hyper::Error,
},
#[snafu(display("Invalid OpenTSDB line"))]
InvalidOpentsdbLine {
#[snafu(source)]
error: FromUtf8Error,
location: Location,
},
#[snafu(display("Invalid OpenTSDB Json request"))]
InvalidOpentsdbJsonRequest {
#[snafu(source)]
@@ -508,7 +501,6 @@ impl ErrorExt for Error {
| InvalidQuery { .. }
| InfluxdbLineProtocol { .. }
| ConnResetByPeer { .. }
| InvalidOpentsdbLine { .. }
| InvalidOpentsdbJsonRequest { .. }
| DecodePromRemoteRequest { .. }
| DecodeOtlpRequest { .. }
@@ -664,7 +656,6 @@ impl IntoResponse for Error {
Error::InfluxdbLineProtocol { .. }
| Error::RowWriter { .. }
| Error::PromSeriesWrite { .. }
| Error::InvalidOpentsdbLine { .. }
| Error::InvalidOpentsdbJsonRequest { .. }
| Error::DecodePromRemoteRequest { .. }
| Error::DecodeOtlpRequest { .. }

View File

@@ -42,7 +42,6 @@ pub mod query_handler;
pub mod repeated_field;
mod row_writer;
pub mod server;
mod shutdown;
pub mod tls;
pub use common_config::Mode;

View File

@@ -25,8 +25,8 @@ use axum::response::IntoResponse;
use hyper::Body;
use lazy_static::lazy_static;
use prometheus::{
register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec,
register_int_gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge,
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
};
use tonic::body::BoxBody;
use tower::{Layer, Service};
@@ -130,11 +130,6 @@ lazy_static! {
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED: Histogram = register_histogram!(
"greptime_servers_opentsdb_line_write_elapsed",
"servers opentsdb line write elapsed"
)
.unwrap();
pub static ref METRIC_MYSQL_CONNECTIONS: IntGauge = register_int_gauge!(
"greptime_servers_mysql_connection_count",
"servers mysql connection count"

View File

@@ -13,127 +13,14 @@
// limitations under the License.
pub mod codec;
pub mod connection;
mod handler;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_grpc::precision::Precision;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_runtime::Runtime;
use common_telemetry::logging::{debug, error, warn};
use futures::StreamExt;
use tokio::sync::broadcast;
use self::codec::DataPoint;
use crate::error::Result;
use crate::opentsdb::connection::Connection;
use crate::opentsdb::handler::Handler;
use crate::query_handler::OpentsdbProtocolHandlerRef;
use crate::row_writer::{self, MultiTableData};
use crate::server::{AbortableStream, BaseTcpServer, Server};
use crate::shutdown::Shutdown;
pub struct OpentsdbServer {
base_server: BaseTcpServer,
query_handler: OpentsdbProtocolHandlerRef,
/// Broadcasts a shutdown signal to all active connections.
///
/// When a connection task is spawned, it is passed a broadcast receiver handle. We can send
/// a `()` value via `notify_shutdown` or just drop `notify_shutdown`, then each active
/// connection receives it, reaches a safe terminal state, and completes the task.
notify_shutdown: Option<broadcast::Sender<()>>,
}
impl OpentsdbServer {
pub fn create_server(
query_handler: OpentsdbProtocolHandlerRef,
io_runtime: Arc<Runtime>,
) -> Box<dyn Server> {
// When the provided `shutdown` future completes, we must send a shutdown
// message to all active connections. We use a broadcast channel for this
// purpose. The call below ignores the receiver of the broadcast pair, and when
// a receiver is needed, the subscribe() method on the sender is used to create
// one.
let (notify_shutdown, _) = broadcast::channel(1);
Box::new(OpentsdbServer {
base_server: BaseTcpServer::create_server("OpenTSDB", io_runtime),
query_handler,
notify_shutdown: Some(notify_shutdown),
})
}
fn accept(
&self,
io_runtime: Arc<Runtime>,
stream: AbortableStream,
) -> impl Future<Output = ()> {
let query_handler = self.query_handler.clone();
let notify_shutdown = self
.notify_shutdown
.clone()
.expect("`notify_shutdown` must be present when accepting connection!");
stream.for_each(move |stream| {
let io_runtime = io_runtime.clone();
let query_handler = query_handler.clone();
let shutdown = Shutdown::new(notify_shutdown.subscribe());
async move {
match stream {
Ok(stream) => {
if let Err(e) = stream.set_nodelay(true) {
warn!(e; "Failed to set TCP nodelay");
}
let connection = Connection::new(stream);
let mut handler = Handler::new(query_handler, connection, shutdown);
let _handle = io_runtime.spawn(async move {
if let Err(e) = handler.run().await {
if e.status_code().should_log_error() {
error!(e; "Unexpected error when handling OpenTSDB connection");
}
}
});
}
Err(error) => debug!("Broken pipe: {}", error), // IoError doesn't impl ErrorExt.
};
}
})
}
}
pub const OPENTSDB_SERVER: &str = "OPENTSDB_SERVER";
#[async_trait]
impl Server for OpentsdbServer {
async fn shutdown(&self) -> Result<()> {
if let Some(tx) = &self.notify_shutdown {
// Err of broadcast sender does not mean that future calls to send will fail, so
// its return value is ignored here.
let _ = tx.send(());
}
self.base_server.shutdown().await?;
Ok(())
}
async fn start(&self, listening: SocketAddr) -> Result<SocketAddr> {
let (stream, addr) = self.base_server.bind(listening).await?;
let io_runtime = self.base_server.io_runtime();
let join_handle = common_runtime::spawn_read(self.accept(io_runtime, stream));
self.base_server.start_with(join_handle).await?;
Ok(addr)
}
fn name(&self) -> &str {
OPENTSDB_SERVER
}
}
pub fn data_point_to_grpc_row_insert_requests(
data_points: Vec<DataPoint>,

View File

@@ -1,205 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Modified from Tokio's mini-redis example.
use bytes::{Buf, BytesMut};
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufWriter};
use crate::error::{self, Result};
type Line = String;
#[derive(Debug)]
pub struct Connection<S: AsyncWrite + AsyncRead + Unpin> {
stream: BufWriter<S>,
buffer: BytesMut,
}
impl<S: AsyncWrite + AsyncRead + Unpin> Connection<S> {
pub fn new(stream: S) -> Connection<S> {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(4 * 1024),
}
}
/// Read one line from the underlying stream.
///
/// The function waits until it has retrieved enough data to parse a line (terminated by \r\n).
/// Any data remaining in the read buffer after the line has been parsed is kept there for the
/// next call to `read_line`.
///
/// # Returns
///
/// On success, the received line is returned. If the stream is closed in a way that
/// doesn't break a line in half, it returns `None`. Otherwise, an error is returned.
pub async fn read_line(&mut self) -> Result<Option<Line>> {
loop {
// Attempt to parse a line from the buffered data. If enough data
// has been buffered, the line is returned.
if let Some(line) = self.parse_line()? {
return Ok(Some(line));
}
// There is not enough buffered data as a line. Attempt to read more from the socket.
// On success, the number of bytes is returned. `0` indicates "end of stream".
if self.stream.read_buf(&mut self.buffer).await? == 0 {
// The remote closed the connection. For this to be a clean shutdown, there should
// be no data in the read buffer. If there is, this means that the peer closed the
// socket while sending a line.
if self.buffer.is_empty() {
return Ok(None);
} else {
return error::ConnResetByPeerSnafu {}.fail();
}
}
}
}
/// Tries to parse a line from the buffer.
///
/// If the buffer contains enough data, the line is returned and the buffered data is removed.
/// If not enough data has been buffered yet, `Ok(None)` is returned.
/// If the buffered data does not represent a valid UTF8 line, `Err` is returned.
fn parse_line(&mut self) -> Result<Option<Line>> {
if self.buffer.is_empty() {
return Ok(None);
}
let buf = &self.buffer[..];
if let Some(pos) = buf.windows(2).position(|w| w == [b'\r', b'\n']) {
let line = buf[0..pos].to_vec();
self.buffer.advance(pos + 2);
Ok(Some(
String::from_utf8(line).context(error::InvalidOpentsdbLineSnafu)?,
))
} else {
// There is not enough data present in the read buffer to parse a single line. We must
// wait for more data to be received from the socket.
Ok(None)
}
}
pub async fn write_line(&mut self, line: String) -> Result<()> {
self.stream
.write_all(line.as_bytes())
.await
.context(error::InternalIoSnafu)?;
let _ = self
.stream
.write(b"\r\n")
.await
.context(error::InternalIoSnafu)?;
self.stream.flush().await.context(error::InternalIoSnafu)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::io::Write;
use bytes::BufMut;
use common_error::ext::ErrorExt;
use tokio_test::io::Builder;
use super::*;
#[tokio::test]
async fn test_read_line() {
let mock = Builder::new()
.read(b"This is")
.read(b" a line.\r\n")
.build();
let mut conn = Connection::new(mock);
let line = conn.read_line().await.unwrap();
assert_eq!(line, Some("This is a line.".to_string()));
let line = conn.read_line().await.unwrap();
assert_eq!(line, None);
let buffer = &mut conn.buffer;
buffer
.writer()
.write_all(b"simulating buffer has remaining data")
.unwrap();
let result = conn.read_line().await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Connection reset by peer"));
}
#[test]
fn test_parse_line() {
let mock = Builder::new().build();
let mut conn = Connection::new(mock);
// initially, no data in the buffer, so return None
let line = conn.parse_line();
assert_matches!(line, Ok(None));
// still has no line, but we have data in the buffer
{
let buffer = &mut conn.buffer;
buffer.writer().write_all(b"This is a ").unwrap();
let line = conn.parse_line();
assert_matches!(line, Ok(None));
}
let buffer = &conn.buffer[..];
assert_eq!(String::from_utf8(buffer.to_vec()).unwrap(), "This is a ");
// finally gets a line, and the buffer has the remaining data
{
let buffer = &mut conn.buffer;
buffer
.writer()
.write_all(b"line.\r\n another line's remaining data")
.unwrap();
let line = conn.parse_line().unwrap();
assert_eq!(line, Some("This is a line.".to_string()));
}
let buffer = &conn.buffer[..];
assert_eq!(
String::from_utf8(buffer.to_vec()).unwrap(),
" another line's remaining data"
);
// expected failed on not valid utf-8 line
let buffer = &mut conn.buffer;
buffer.writer().write_all(b"Hello Wor\xffld.\r\n").unwrap();
let result = conn.parse_line();
assert!(result.is_err());
let err = result.unwrap_err().output_msg();
assert!(err.contains("invalid utf-8 sequence"));
}
#[tokio::test]
async fn test_write_err() {
let mock = Builder::new()
.write(b"An OpenTSDB error.")
.write(b"\r\n")
.build();
let mut conn = Connection::new(mock);
conn.write_line("An OpenTSDB error.".to_string())
.await
.unwrap();
}
}

View File

@@ -1,205 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Modified from Tokio's mini-redis example.
use common_error::ext::ErrorExt;
use session::context::QueryContextBuilder;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::error::Result;
use crate::opentsdb::codec::DataPoint;
use crate::opentsdb::connection::Connection;
use crate::query_handler::OpentsdbProtocolHandlerRef;
use crate::shutdown::Shutdown;
/// Per-connection handler. Reads requests from `connection` and applies the OpenTSDB metric to
/// [OpentsdbProtocolHandlerRef].
pub(crate) struct Handler<S: AsyncWrite + AsyncRead + Unpin> {
query_handler: OpentsdbProtocolHandlerRef,
/// The TCP connection decorated with OpenTSDB line protocol encoder / decoder implemented
/// using a buffered `TcpStream`.
///
/// When TCP listener receives an inbound connection, the `TcpStream` is passed to
/// `Connection::new`, which initializes the associated buffers. The byte level protocol
/// parsing details is encapsulated in `Connection`.
connection: Connection<S>,
/// Listen for shutdown notifications.
///
/// A wrapper around the `broadcast::Receiver` paired with the sender in TCP connections
/// listener. The connection handler processes requests from the connection until the peer
/// disconnects **or** a shutdown notification is received from `shutdown`. In the latter case,
/// any in-flight work being processed for the peer is continued until it reaches a safe state,
/// at which point the connection is terminated. (Graceful shutdown.)
shutdown: Shutdown,
}
impl<S: AsyncWrite + AsyncRead + Unpin> Handler<S> {
pub(crate) fn new(
query_handler: OpentsdbProtocolHandlerRef,
connection: Connection<S>,
shutdown: Shutdown,
) -> Self {
Self {
query_handler,
connection,
shutdown,
}
}
pub(crate) async fn run(&mut self) -> Result<()> {
// TODO(shuiyisong): figure out how to auth in tcp connection.
let ctx = QueryContextBuilder::default().build();
while !self.shutdown.is_shutdown() {
// While reading a request, also listen for the shutdown signal.
let maybe_line = tokio::select! {
line = self.connection.read_line() => line?,
_ = self.shutdown.recv() => {
// If a shutdown signal is received, return from `run`.
// This will result in the task terminating.
return Ok(());
}
};
// If `None` is returned from `read_line()` then the peer closed the socket. There is
// no further work to do and the task can be terminated.
let line = match maybe_line {
Some(line) => line,
None => return Ok(()),
};
// Close connection upon receiving "quit" line. With actual OpenTSDB, telnet just won't
// quit, the connection to OpenTSDB server can be closed only via terminating telnet
// session manually, for example, close the terminal window. That is a little annoying,
// so I added "quit" command to the line protocol, to make telnet client able to quit
// gracefully.
if line.trim().eq_ignore_ascii_case("quit") {
return Ok(());
}
match DataPoint::try_create(&line) {
Ok(data_point) => {
let _timer =
crate::metrics::METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED.start_timer();
let result = self.query_handler.exec(vec![data_point], ctx.clone()).await;
if let Err(e) = result {
self.connection.write_line(e.output_msg()).await?;
}
}
Err(e) => {
self.connection.write_line(e.output_msg()).await?;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use session::context::QueryContextRef;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc};
use super::*;
use crate::error;
use crate::query_handler::OpentsdbProtocolHandler;
struct DummyQueryHandler {
tx: mpsc::Sender<String>,
}
#[async_trait]
impl OpentsdbProtocolHandler for DummyQueryHandler {
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let metric = data_points.first().unwrap().metric();
if metric == "should_failed" {
return error::InternalSnafu {
err_msg: "expected",
}
.fail();
}
self.tx.send(metric.to_string()).await.unwrap();
Ok(data_points.len())
}
}
#[tokio::test]
async fn test_run() {
let (tx, mut rx) = mpsc::channel(100);
let query_handler = Arc::new(DummyQueryHandler { tx });
let (notify_shutdown, _) = broadcast::channel(1);
let addr = start_server(query_handler, notify_shutdown).await;
let stream = TcpStream::connect(addr).await.unwrap();
let mut client = Connection::new(stream);
client
.write_line("put my_metric_1 1000 1.0 host=web01".to_string())
.await
.unwrap();
assert_eq!(rx.recv().await.unwrap(), "my_metric_1");
client
.write_line("put my_metric_2 1000 1.0 host=web01".to_string())
.await
.unwrap();
assert_eq!(rx.recv().await.unwrap(), "my_metric_2");
client
.write_line("put should_failed 1000 1.0 host=web01".to_string())
.await
.unwrap();
let resp = client.read_line().await.unwrap();
assert_eq!(resp, Some("Internal error: 1003".to_string()));
client.write_line("get".to_string()).await.unwrap();
let resp = client.read_line().await.unwrap();
assert_eq!(
resp,
Some("Invalid query: unknown command get.".to_string())
);
}
async fn start_server(
query_handler: OpentsdbProtocolHandlerRef,
notify_shutdown: broadcast::Sender<()>,
) -> SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let _handle = common_runtime::spawn_read(async move {
loop {
let (stream, _) = listener.accept().await.unwrap();
let query_handler = query_handler.clone();
let connection = Connection::new(stream);
let shutdown = Shutdown::new(notify_shutdown.subscribe());
let _handle = common_runtime::spawn_read(async move {
Handler::new(query_handler, connection, shutdown)
.run()
.await
});
}
});
addr
}
}

View File

@@ -1,65 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Copied from tokio's mini-redis example.
use tokio::sync::broadcast;
/// Listens for the server shutdown signal.
///
/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
/// ever sent. Once a value has been sent via the broadcast channel, the server
/// should shutdown.
///
/// The `Shutdown` struct listens for the signal and tracks that the signal has
/// been received. Callers may query for whether the shutdown signal has been
/// received or not.
#[derive(Debug)]
pub(crate) struct Shutdown {
/// `true` if the shutdown signal has been received
shutdown: bool,
/// The receive half of the channel used to listen for shutdown.
notify: broadcast::Receiver<()>,
}
impl Shutdown {
/// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown {
Shutdown {
shutdown: false,
notify,
}
}
/// Returns `true` if the shutdown signal has been received.
pub(crate) fn is_shutdown(&self) -> bool {
self.shutdown
}
/// Receive the shutdown notice, waiting if necessary.
pub(crate) async fn recv(&mut self) {
// If the shutdown signal has already been received, then return
// immediately.
if self.shutdown {
return;
}
// Cannot receive a "lag error" as only one value is ever sent.
let _ = self.notify.recv().await;
// Remember that the signal has been received.
self.shutdown = true;
}
}

View File

@@ -40,7 +40,6 @@ mod grpc;
mod http;
mod interceptor;
mod mysql;
mod opentsdb;
mod postgres;
mod py_script;

View File

@@ -1,283 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use common_runtime::Builder as RuntimeBuilder;
use rand::rngs::StdRng;
use rand::Rng;
use servers::error::{self as server_error, Error, Result};
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::connection::Connection;
use servers::opentsdb::OpentsdbServer;
use servers::query_handler::OpentsdbProtocolHandler;
use servers::server::Server;
use session::context::QueryContextRef;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, Notify};
struct DummyOpentsdbInstance {
tx: mpsc::Sender<i32>,
}
#[async_trait]
impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let metric = data_points.first().unwrap().metric();
if metric == "should_failed" {
return server_error::InternalSnafu {
err_msg: "expected",
}
.fail();
}
let i = metric.parse::<i32>().unwrap();
let _ = self.tx.send(i * i).await;
Ok(data_points.len())
}
}
fn create_opentsdb_server(tx: mpsc::Sender<i32>) -> Result<Box<dyn Server>> {
let query_handler = Arc::new(DummyOpentsdbInstance { tx });
let io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(2)
.thread_name("opentsdb-io-handlers")
.build()
.unwrap(),
);
Ok(OpentsdbServer::create_server(query_handler, io_runtime))
}
#[tokio::test]
async fn test_start_opentsdb_server() -> Result<()> {
let (tx, _) = mpsc::channel(100);
let server = create_opentsdb_server(tx)?;
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let result = server.start(listening).await;
let _ = result.unwrap();
let result = server.start(listening).await;
assert!(result
.unwrap_err()
.to_string()
.contains("OpenTSDB server has been started."));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shutdown_opentsdb_server_concurrently() -> Result<()> {
let (tx, _) = mpsc::channel(100);
let server = create_opentsdb_server(tx)?;
let result = server.shutdown().await;
assert!(result
.unwrap_err()
.to_string()
.contains("OpenTSDB server is not started."));
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let addr = server.start(listening).await?;
let notify = Arc::new(Notify::new());
let notify_in_task = notify.clone();
let stop = Arc::new(AtomicBool::new(false));
let stop_task = stop.clone();
let stop_timeout = Duration::from_secs(5 * 60);
let join_handle = tokio::spawn(async move {
let mut i = 1;
let mut stop_time = None;
loop {
let stream = TcpStream::connect(addr).await;
match stream {
Ok(stream) => {
let mut connection = Connection::new(stream);
let result = connection.write_line(format!("put {i} 1 1")).await;
i += 1;
if i > 4 {
// Ensure the server has been started.
notify_in_task.notify_one();
}
if let Err(e) = result {
match e {
Error::InternalIo { .. } => return,
_ => panic!("Not IO error, err is {e}"),
}
}
if stop.load(Ordering::Relaxed) {
let du_since_stop = stop_time.get_or_insert_with(Instant::now).elapsed();
if du_since_stop > stop_timeout {
// Avoid hang on test.
panic!("Stop timeout");
}
}
}
Err(_) => return,
}
}
});
notify.notified().await;
server.shutdown().await.unwrap();
stop_task.store(true, Ordering::Relaxed);
join_handle.await.unwrap();
Ok(())
}
#[tokio::test]
async fn test_opentsdb_connection_shutdown() -> Result<()> {
let (tx, _) = mpsc::channel(100);
let server = create_opentsdb_server(tx)?;
let result = server.shutdown().await;
assert!(result
.unwrap_err()
.to_string()
.contains("OpenTSDB server is not started."));
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let addr = server.start(listening).await?;
let stream = TcpStream::connect(addr).await.unwrap();
let mut connection = Connection::new(stream);
connection
.write_line("put 1 1 1".to_string())
.await
.unwrap();
server.shutdown().await.unwrap();
let shutdown_time = Instant::now();
let timeout = Duration::from_secs(5 * 60);
let mut i = 2;
loop {
// The connection may not be unwritable after shutdown immediately.
let result = connection.write_line(format!("put {i} 1 1")).await;
i += 1;
if result.is_err() {
if let Err(e) = result {
match e {
Error::InternalIo { .. } => break,
_ => panic!("Not IO error, err is {e}"),
}
}
}
if shutdown_time.elapsed() > timeout {
panic!("Shutdown timeout");
}
}
Ok(())
}
#[tokio::test]
async fn test_opentsdb_connect_after_shutdown() -> Result<()> {
let (tx, _) = mpsc::channel(100);
let server = create_opentsdb_server(tx)?;
let result = server.shutdown().await;
assert!(result
.unwrap_err()
.to_string()
.contains("OpenTSDB server is not started."));
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let addr = server.start(listening).await?;
server.shutdown().await.unwrap();
assert!(TcpStream::connect(addr).await.is_err());
Ok(())
}
#[tokio::test]
async fn test_query() -> Result<()> {
let (tx, mut rx) = mpsc::channel(10);
let server = create_opentsdb_server(tx)?;
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let addr = server.start(listening).await?;
let stream = TcpStream::connect(addr).await.unwrap();
let mut connection = Connection::new(stream);
connection.write_line("put 100 1 1".to_string()).await?;
assert_eq!(rx.recv().await.unwrap(), 10000);
connection
.write_line("foo illegal put line".to_string())
.await
.unwrap();
let result = connection.read_line().await?;
assert_eq!(
result,
Some("Invalid query: unknown command foo.".to_string())
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_query_concurrently() -> Result<()> {
let threads = 4;
let expect_executed_queries_per_worker = 1000;
let (tx, mut rx) = mpsc::channel(threads * expect_executed_queries_per_worker);
let server = create_opentsdb_server(tx)?;
let listening = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let addr = server.start(listening).await?;
let mut join_handles = vec![];
for _ in 0..threads {
join_handles.push(tokio::spawn(async move {
let mut rand: StdRng = rand::SeedableRng::from_entropy();
let stream = TcpStream::connect(addr).await.unwrap();
let mut connection = Connection::new(stream);
for i in 0..expect_executed_queries_per_worker {
connection.write_line(format!("put {i} 1 1")).await.unwrap();
let should_recreate_conn = rand.gen_range(0..100) == 1;
if should_recreate_conn {
let stream = TcpStream::connect(addr).await.unwrap();
connection = Connection::new(stream);
}
}
expect_executed_queries_per_worker
}))
}
let mut total_pending_queries = threads * expect_executed_queries_per_worker;
for handle in join_handles.iter_mut() {
total_pending_queries -= handle.await.unwrap();
}
assert_eq!(0, total_pending_queries);
let mut expected_result: i32 = (threads
* (0..expect_executed_queries_per_worker)
.map(|i| i * i)
.sum::<usize>()) as i32;
while let Some(i) = rx.recv().await {
expected_result -= i;
if expected_result == 0 {
break;
}
}
Ok(())
}

View File

@@ -770,8 +770,6 @@ watch = false
[frontend.opentsdb]
enable = true
addr = "127.0.0.1:4242"
runtime_size = 2
[frontend.influxdb]
enable = true