fix: use catalog from connection (#1099)

* fix: using schema instead of full database

* fix: using schema instead of full database

* fix: using schema instead of full database

* chore: add debug log

* chore: remove debug log

* chore: remove debug log

* chore: fix cr
This commit is contained in:
shuiyisong
2023-03-01 10:34:57 +08:00
committed by GitHub
parent 8cd69f441e
commit dc50095af3
9 changed files with 46 additions and 19 deletions

View File

@@ -17,16 +17,16 @@ use common_error::prelude::BoxedError;
use servers::error as server_error;
use servers::opentsdb::codec::DataPoint;
use servers::query_handler::OpentsdbProtocolHandler;
use session::context::QueryContext;
use session::context::QueryContextRef;
use snafu::prelude::*;
use crate::instance::Instance;
#[async_trait]
impl OpentsdbProtocolHandler for Instance {
async fn exec(&self, data_point: &DataPoint) -> server_error::Result<()> {
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> server_error::Result<()> {
let request = data_point.as_grpc_insert();
self.handle_insert(request, QueryContext::arc())
self.handle_insert(request, ctx)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
@@ -66,6 +66,7 @@ mod tests {
}
async fn test_exec(instance: &Arc<Instance>) {
let ctx = QueryContext::arc();
let data_point1 = DataPoint::new(
"my_metric_1".to_string(),
1000,
@@ -76,7 +77,7 @@ mod tests {
],
);
// should create new table "my_metric_1" directly
let result = instance.exec(&data_point1).await;
let result = instance.exec(&data_point1, ctx.clone()).await;
assert!(result.is_ok());
let data_point2 = DataPoint::new(
@@ -89,12 +90,12 @@ mod tests {
],
);
// should create new column "tagk3" directly
let result = instance.exec(&data_point2).await;
let result = instance.exec(&data_point2, ctx.clone()).await;
assert!(result.is_ok());
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
// should handle null tags properly
let result = instance.exec(&data_point3).await;
let result = instance.exec(&data_point3, ctx.clone()).await;
assert!(result.is_ok());
let output = instance

View File

@@ -18,12 +18,13 @@ use std::sync::Arc;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_grpc::writer::Precision;
use session::context::QueryContext;
use crate::error::{Result, TimePrecisionSnafu};
use crate::influxdb::InfluxdbRequest;
use crate::parse_catalog_and_schema_from_client_database_name;
use crate::query_handler::InfluxdbLineProtocolHandlerRef;
// https://docs.influxdata.com/influxdb/v1.8/tools/api/#ping-http-endpoint
@@ -47,7 +48,8 @@ pub async fn influxdb_write(
let db = params
.remove("db")
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, &db));
let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(&db);
let ctx = Arc::new(QueryContext::with(catalog, schema));
let precision = params
.get("precision")

View File

@@ -13,16 +13,20 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use axum::extract::{Query, RawBody, State};
use axum::http::StatusCode as HttpStatusCode;
use axum::Json;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use hyper::Body;
use serde::{Deserialize, Serialize};
use session::context::QueryContext;
use snafu::ResultExt;
use crate::error::{self, Error, Result};
use crate::opentsdb::codec::DataPoint;
use crate::parse_catalog_and_schema_from_client_database_name;
use crate::query_handler::OpentsdbProtocolHandlerRef;
#[derive(Serialize, Deserialize)]
@@ -81,11 +85,19 @@ pub async fn put(
let summary = params.contains_key("summary");
let details = params.contains_key("details");
let db = params
.get("db")
.map(|v| v.as_str())
.unwrap_or(DEFAULT_SCHEMA_NAME);
let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(db);
let ctx = Arc::new(QueryContext::with(catalog, schema));
let data_points = parse_data_points(body).await?;
let response = if !summary && !details {
for data_point in data_points.into_iter() {
if let Err(e) = opentsdb_handler.exec(&data_point.into()).await {
if let Err(e) = opentsdb_handler.exec(&data_point.into(), ctx.clone()).await {
// Not debugging purpose, failed fast.
return error::InternalSnafu {
err_msg: e.to_string(),
@@ -106,7 +118,9 @@ pub async fn put(
};
for data_point in data_points.into_iter() {
let result = opentsdb_handler.exec(&data_point.clone().into()).await;
let result = opentsdb_handler
.exec(&data_point.clone().into(), ctx.clone())
.await;
match result {
Ok(()) => response.on_success(),
Err(e) => {

View File

@@ -18,7 +18,7 @@ use api::prometheus::remote::{ReadRequest, WriteRequest};
use axum::extract::{Query, RawBody, State};
use axum::http::{header, StatusCode};
use axum::response::IntoResponse;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use hyper::Body;
use prost::Message;
use schemars::JsonSchema;
@@ -27,6 +27,7 @@ use session::context::QueryContext;
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::parse_catalog_and_schema_from_client_database_name;
use crate::prometheus::snappy_decompress;
use crate::query_handler::{PrometheusProtocolHandlerRef, PrometheusResponse};
@@ -52,11 +53,13 @@ pub async fn remote_write(
let request = decode_remote_write_request(body).await?;
let ctx = if let Some(db) = params.db {
Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, &db))
let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(&db);
Arc::new(QueryContext::with(catalog, schema))
} else {
QueryContext::arc()
};
// TODO(shuiyisong): add more error log
handler.write(request, ctx).await?;
Ok((StatusCode::NO_CONTENT, ()))
}
@@ -83,11 +86,13 @@ pub async fn remote_read(
let request = decode_remote_read_request(body).await?;
let ctx = if let Some(db) = params.db {
Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, &db))
let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(&db);
Arc::new(QueryContext::with(catalog, schema))
} else {
QueryContext::arc()
};
// TODO(shuiyisong): add more error log
handler.read(request, ctx).await
}

View File

@@ -206,7 +206,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
let context = self.session.context();
context.set_current_catalog(catalog);
context.set_current_schema(database);
context.set_current_schema(schema);
w.ok().await.map_err(|e| e.into())
}

View File

@@ -14,6 +14,7 @@
//! Modified from Tokio's mini-redis example.
use session::context::QueryContext;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::error::Result;
@@ -59,6 +60,8 @@ impl<S: AsyncWrite + AsyncRead + Unpin> Handler<S> {
}
pub(crate) async fn run(&mut self) -> Result<()> {
// TODO(shuiyisong): figure out how to auth in tcp connection.
let ctx = QueryContext::arc();
while !self.shutdown.is_shutdown() {
// While reading a request, also listen for the shutdown signal.
let maybe_line = tokio::select! {
@@ -88,7 +91,7 @@ impl<S: AsyncWrite + AsyncRead + Unpin> Handler<S> {
match DataPoint::try_create(&line) {
Ok(data_point) => {
let result = self.query_handler.exec(&data_point).await;
let result = self.query_handler.exec(&data_point, ctx.clone()).await;
if let Err(e) = result {
self.connection.write_line(e.to_string()).await?;
}
@@ -108,6 +111,7 @@ mod tests {
use std::sync::Arc;
use async_trait::async_trait;
use session::context::QueryContextRef;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc};
@@ -121,7 +125,7 @@ mod tests {
#[async_trait]
impl OpentsdbProtocolHandler for DummyQueryHandler {
async fn exec(&self, data_point: &DataPoint) -> Result<()> {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
let metric = data_point.metric();
if metric == "should_failed" {
return error::InternalSnafu {

View File

@@ -65,7 +65,7 @@ pub trait InfluxdbLineProtocolHandler {
pub trait OpentsdbProtocolHandler {
/// A successful request will not return a response.
/// Only on error will the socket return a line of data.
async fn exec(&self, data_point: &DataPoint) -> Result<()>;
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> Result<()>;
}
pub struct PrometheusResponse {

View File

@@ -34,7 +34,7 @@ struct DummyInstance {
#[async_trait]
impl OpentsdbProtocolHandler for DummyInstance {
async fn exec(&self, data_point: &DataPoint) -> Result<()> {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
if data_point.metric() == "should_failed" {
return error::InternalSnafu {
err_msg: "expected",

View File

@@ -27,6 +27,7 @@ use servers::opentsdb::connection::Connection;
use servers::opentsdb::OpentsdbServer;
use servers::query_handler::OpentsdbProtocolHandler;
use servers::server::Server;
use session::context::QueryContextRef;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, Notify};
@@ -36,7 +37,7 @@ struct DummyOpentsdbInstance {
#[async_trait]
impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
async fn exec(&self, data_point: &DataPoint) -> Result<()> {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
let metric = data_point.metric();
if metric == "should_failed" {
return server_error::InternalSnafu {