From dc50095af397bd54fe588ea90a03102bc4cd2600 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Wed, 1 Mar 2023 10:34:57 +0800 Subject: [PATCH] fix: use catalog from connection (#1099) * fix: using schema instead of full database * fix: using schema instead of full database * fix: using schema instead of full database * chore: add debug log * chore: remove debug log * chore: remove debug log * chore: fix cr --- src/frontend/src/instance/opentsdb.rs | 13 +++++++------ src/servers/src/http/influxdb.rs | 6 ++++-- src/servers/src/http/opentsdb.rs | 18 ++++++++++++++++-- src/servers/src/http/prometheus.rs | 11 ++++++++--- src/servers/src/mysql/handler.rs | 2 +- src/servers/src/opentsdb/handler.rs | 8 ++++++-- src/servers/src/query_handler.rs | 2 +- src/servers/tests/http/opentsdb_test.rs | 2 +- src/servers/tests/opentsdb.rs | 3 ++- 9 files changed, 46 insertions(+), 19 deletions(-) diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index d27e59c8cb..3a5f6b9a5a 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -17,16 +17,16 @@ use common_error::prelude::BoxedError; use servers::error as server_error; use servers::opentsdb::codec::DataPoint; use servers::query_handler::OpentsdbProtocolHandler; -use session::context::QueryContext; +use session::context::QueryContextRef; use snafu::prelude::*; use crate::instance::Instance; #[async_trait] impl OpentsdbProtocolHandler for Instance { - async fn exec(&self, data_point: &DataPoint) -> server_error::Result<()> { + async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> server_error::Result<()> { let request = data_point.as_grpc_insert(); - self.handle_insert(request, QueryContext::arc()) + self.handle_insert(request, ctx) .await .map_err(BoxedError::new) .with_context(|_| server_error::ExecuteQuerySnafu { @@ -66,6 +66,7 @@ mod tests { } async fn test_exec(instance: &Arc) { + let ctx = QueryContext::arc(); let data_point1 = DataPoint::new( "my_metric_1".to_string(), 1000, @@ -76,7 +77,7 @@ mod tests { ], ); // should create new table "my_metric_1" directly - let result = instance.exec(&data_point1).await; + let result = instance.exec(&data_point1, ctx.clone()).await; assert!(result.is_ok()); let data_point2 = DataPoint::new( @@ -89,12 +90,12 @@ mod tests { ], ); // should create new column "tagk3" directly - let result = instance.exec(&data_point2).await; + let result = instance.exec(&data_point2, ctx.clone()).await; assert!(result.is_ok()); let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]); // should handle null tags properly - let result = instance.exec(&data_point3).await; + let result = instance.exec(&data_point3, ctx.clone()).await; assert!(result.is_ok()); let output = instance diff --git a/src/servers/src/http/influxdb.rs b/src/servers/src/http/influxdb.rs index e912874ee9..e7d265e284 100644 --- a/src/servers/src/http/influxdb.rs +++ b/src/servers/src/http/influxdb.rs @@ -18,12 +18,13 @@ use std::sync::Arc; use axum::extract::{Query, State}; use axum::http::StatusCode; use axum::response::IntoResponse; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_grpc::writer::Precision; use session::context::QueryContext; use crate::error::{Result, TimePrecisionSnafu}; use crate::influxdb::InfluxdbRequest; +use crate::parse_catalog_and_schema_from_client_database_name; use crate::query_handler::InfluxdbLineProtocolHandlerRef; // https://docs.influxdata.com/influxdb/v1.8/tools/api/#ping-http-endpoint @@ -47,7 +48,8 @@ pub async fn influxdb_write( let db = params .remove("db") .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); - let ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, &db)); + let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(&db); + let ctx = Arc::new(QueryContext::with(catalog, schema)); let precision = params .get("precision") diff --git a/src/servers/src/http/opentsdb.rs b/src/servers/src/http/opentsdb.rs index 5bda9047d9..566253dfb9 100644 --- a/src/servers/src/http/opentsdb.rs +++ b/src/servers/src/http/opentsdb.rs @@ -13,16 +13,20 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use axum::extract::{Query, RawBody, State}; use axum::http::StatusCode as HttpStatusCode; use axum::Json; +use common_catalog::consts::DEFAULT_SCHEMA_NAME; use hyper::Body; use serde::{Deserialize, Serialize}; +use session::context::QueryContext; use snafu::ResultExt; use crate::error::{self, Error, Result}; use crate::opentsdb::codec::DataPoint; +use crate::parse_catalog_and_schema_from_client_database_name; use crate::query_handler::OpentsdbProtocolHandlerRef; #[derive(Serialize, Deserialize)] @@ -81,11 +85,19 @@ pub async fn put( let summary = params.contains_key("summary"); let details = params.contains_key("details"); + let db = params + .get("db") + .map(|v| v.as_str()) + .unwrap_or(DEFAULT_SCHEMA_NAME); + + let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(db); + let ctx = Arc::new(QueryContext::with(catalog, schema)); + let data_points = parse_data_points(body).await?; let response = if !summary && !details { for data_point in data_points.into_iter() { - if let Err(e) = opentsdb_handler.exec(&data_point.into()).await { + if let Err(e) = opentsdb_handler.exec(&data_point.into(), ctx.clone()).await { // Not debugging purpose, failed fast. return error::InternalSnafu { err_msg: e.to_string(), @@ -106,7 +118,9 @@ pub async fn put( }; for data_point in data_points.into_iter() { - let result = opentsdb_handler.exec(&data_point.clone().into()).await; + let result = opentsdb_handler + .exec(&data_point.clone().into(), ctx.clone()) + .await; match result { Ok(()) => response.on_success(), Err(e) => { diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index db66144fed..555bb56b50 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -18,7 +18,7 @@ use api::prometheus::remote::{ReadRequest, WriteRequest}; use axum::extract::{Query, RawBody, State}; use axum::http::{header, StatusCode}; use axum::response::IntoResponse; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::DEFAULT_SCHEMA_NAME; use hyper::Body; use prost::Message; use schemars::JsonSchema; @@ -27,6 +27,7 @@ use session::context::QueryContext; use snafu::prelude::*; use crate::error::{self, Result}; +use crate::parse_catalog_and_schema_from_client_database_name; use crate::prometheus::snappy_decompress; use crate::query_handler::{PrometheusProtocolHandlerRef, PrometheusResponse}; @@ -52,11 +53,13 @@ pub async fn remote_write( let request = decode_remote_write_request(body).await?; let ctx = if let Some(db) = params.db { - Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, &db)) + let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(&db); + Arc::new(QueryContext::with(catalog, schema)) } else { QueryContext::arc() }; + // TODO(shuiyisong): add more error log handler.write(request, ctx).await?; Ok((StatusCode::NO_CONTENT, ())) } @@ -83,11 +86,13 @@ pub async fn remote_read( let request = decode_remote_read_request(body).await?; let ctx = if let Some(db) = params.db { - Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, &db)) + let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(&db); + Arc::new(QueryContext::with(catalog, schema)) } else { QueryContext::arc() }; + // TODO(shuiyisong): add more error log handler.read(request, ctx).await } diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 284b801dbb..f3d3f35190 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -206,7 +206,7 @@ impl AsyncMysqlShim for MysqlInstanceShi let context = self.session.context(); context.set_current_catalog(catalog); - context.set_current_schema(database); + context.set_current_schema(schema); w.ok().await.map_err(|e| e.into()) } diff --git a/src/servers/src/opentsdb/handler.rs b/src/servers/src/opentsdb/handler.rs index f729b73212..b3f9989727 100644 --- a/src/servers/src/opentsdb/handler.rs +++ b/src/servers/src/opentsdb/handler.rs @@ -14,6 +14,7 @@ //! Modified from Tokio's mini-redis example. +use session::context::QueryContext; use tokio::io::{AsyncRead, AsyncWrite}; use crate::error::Result; @@ -59,6 +60,8 @@ impl Handler { } pub(crate) async fn run(&mut self) -> Result<()> { + // TODO(shuiyisong): figure out how to auth in tcp connection. + let ctx = QueryContext::arc(); while !self.shutdown.is_shutdown() { // While reading a request, also listen for the shutdown signal. let maybe_line = tokio::select! { @@ -88,7 +91,7 @@ impl Handler { match DataPoint::try_create(&line) { Ok(data_point) => { - let result = self.query_handler.exec(&data_point).await; + let result = self.query_handler.exec(&data_point, ctx.clone()).await; if let Err(e) = result { self.connection.write_line(e.to_string()).await?; } @@ -108,6 +111,7 @@ mod tests { use std::sync::Arc; use async_trait::async_trait; + use session::context::QueryContextRef; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{broadcast, mpsc}; @@ -121,7 +125,7 @@ mod tests { #[async_trait] impl OpentsdbProtocolHandler for DummyQueryHandler { - async fn exec(&self, data_point: &DataPoint) -> Result<()> { + async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> { let metric = data_point.metric(); if metric == "should_failed" { return error::InternalSnafu { diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index d94cdb1c0a..da5eae3938 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -65,7 +65,7 @@ pub trait InfluxdbLineProtocolHandler { pub trait OpentsdbProtocolHandler { /// A successful request will not return a response. /// Only on error will the socket return a line of data. - async fn exec(&self, data_point: &DataPoint) -> Result<()>; + async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> Result<()>; } pub struct PrometheusResponse { diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 498168d60a..e9894804ec 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -34,7 +34,7 @@ struct DummyInstance { #[async_trait] impl OpentsdbProtocolHandler for DummyInstance { - async fn exec(&self, data_point: &DataPoint) -> Result<()> { + async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> { if data_point.metric() == "should_failed" { return error::InternalSnafu { err_msg: "expected", diff --git a/src/servers/tests/opentsdb.rs b/src/servers/tests/opentsdb.rs index aa3d9a5e7c..69a90d08ca 100644 --- a/src/servers/tests/opentsdb.rs +++ b/src/servers/tests/opentsdb.rs @@ -27,6 +27,7 @@ use servers::opentsdb::connection::Connection; use servers::opentsdb::OpentsdbServer; use servers::query_handler::OpentsdbProtocolHandler; use servers::server::Server; +use session::context::QueryContextRef; use tokio::net::TcpStream; use tokio::sync::{mpsc, Notify}; @@ -36,7 +37,7 @@ struct DummyOpentsdbInstance { #[async_trait] impl OpentsdbProtocolHandler for DummyOpentsdbInstance { - async fn exec(&self, data_point: &DataPoint) -> Result<()> { + async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> { let metric = data_point.metric(); if metric == "should_failed" { return server_error::InternalSnafu {