fix: table and database conflicts (#491)

* fix: table conflicts in different database, #483

* feat: support db query param in prometheus remoting read/write

* feat: support db query param in influxdb line protocol

* fix: make schema_name work in gRPC

* fix: table data path

* fix: table manifest dir

* feat: adds opendal logging layer to object store

* Update src/frontend/src/instance.rs

Co-authored-by: LFC <bayinamine@gmail.com>

* Update src/frontend/src/instance.rs

Co-authored-by: LFC <bayinamine@gmail.com>

* Update src/servers/src/line_writer.rs

Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com>

* Update src/servers/src/line_writer.rs

Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com>

* fix: compile error

* ci: use larger runner for running coverage

* fix: address already in use in test

Co-authored-by: LFC <bayinamine@gmail.com>
Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
dennis zhuang
2022-11-14 23:16:52 +08:00
committed by GitHub
parent 76732d6506
commit 448e8f139e
26 changed files with 514 additions and 161 deletions

View File

@@ -329,7 +329,6 @@ impl HttpServer {
router = router.nest(&format!("/{}/opentsdb", HTTP_API_VERSION), opentsdb_router);
}
// TODO(fys): Creating influxdb's database when we can create greptime schema.
if let Some(influxdb_handler) = self.influxdb_handler.clone() {
let influxdb_router =
Router::with_state(influxdb_handler).route("/write", routing::post(influxdb_write));

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_grpc::writer::Precision;
use crate::error::Result;
@@ -12,14 +13,22 @@ use crate::query_handler::InfluxdbLineProtocolHandlerRef;
#[axum_macros::debug_handler]
pub async fn influxdb_write(
State(handler): State<InfluxdbLineProtocolHandlerRef>,
Query(params): Query<HashMap<String, String>>,
Query(mut params): Query<HashMap<String, String>>,
lines: String,
) -> Result<(StatusCode, ())> {
let db = params
.remove("db")
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let precision = params
.get("precision")
.map(|val| parse_time_precision(val))
.transpose()?;
let request = InfluxdbRequest { precision, lines };
let request = InfluxdbRequest {
precision,
lines,
db,
};
handler.exec(&request).await?;
Ok((StatusCode::NO_CONTENT, ()))
}

View File

@@ -1,24 +1,43 @@
use api::prometheus::remote::{ReadRequest, WriteRequest};
use axum::extract::{RawBody, State};
use axum::extract::{Query, RawBody, State};
use axum::http::header;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use hyper::Body;
use prost::Message;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::prometheus::snappy_decompress;
use crate::query_handler::{PrometheusProtocolHandlerRef, PrometheusResponse};
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct DatabaseQuery {
pub db: Option<String>,
}
impl Default for DatabaseQuery {
fn default() -> DatabaseQuery {
Self {
db: Some(DEFAULT_SCHEMA_NAME.to_string()),
}
}
}
#[axum_macros::debug_handler]
pub async fn remote_write(
State(handler): State<PrometheusProtocolHandlerRef>,
Query(params): Query<DatabaseQuery>,
RawBody(body): RawBody,
) -> Result<(StatusCode, ())> {
let request = decode_remote_write_request(body).await?;
handler.write(request).await?;
handler
.write(params.db.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), request)
.await?;
Ok((StatusCode::NO_CONTENT, ()))
}
@@ -39,11 +58,14 @@ impl IntoResponse for PrometheusResponse {
#[axum_macros::debug_handler]
pub async fn remote_read(
State(handler): State<PrometheusProtocolHandlerRef>,
Query(params): Query<DatabaseQuery>,
RawBody(body): RawBody,
) -> Result<PrometheusResponse> {
let request = decode_remote_read_request(body).await?;
handler.read(request).await
handler
.read(params.db.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), request)
.await
}
async fn decode_remote_write_request(body: Body) -> Result<WriteRequest> {

View File

@@ -4,7 +4,6 @@ use api::v1::{
insert_expr::{self, Expr},
InsertExpr,
};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_grpc::writer::{LinesWriter, Precision};
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::ResultExt;
@@ -18,6 +17,7 @@ pub const DEFAULT_TIME_PRECISION: Precision = Precision::NANOSECOND;
pub struct InfluxdbRequest {
pub precision: Option<Precision>,
pub db: String,
pub lines: String,
}
@@ -32,12 +32,13 @@ impl TryFrom<&InfluxdbRequest> for Vec<InsertRequest> {
.context(InfluxdbLineProtocolSnafu)?;
let line_len = lines.len();
let mut writers: HashMap<TableName, LineWriter> = HashMap::new();
let db = &value.db;
for line in lines {
let table_name = line.series.measurement;
let writer = writers
.entry(table_name.to_string())
.or_insert_with(|| LineWriter::with_lines(table_name, line_len));
.or_insert_with(|| LineWriter::with_lines(db, table_name, line_len));
let tags = line.series.tag_set;
if let Some(tags) = tags {
@@ -81,8 +82,7 @@ impl TryFrom<&InfluxdbRequest> for Vec<InsertExpr> {
type Error = Error;
fn try_from(value: &InfluxdbRequest) -> Result<Self, Self::Error> {
// InfluxDB uses default catalog name and schema name
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
let schema_name = value.db.to_string();
let mut writers: HashMap<TableName, LinesWriter> = HashMap::new();
let lines = parse_lines(&value.lines)
@@ -192,12 +192,14 @@ monitor2,host=host3 cpu=66.5 1663840496100023102
monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let influxdb_req = &InfluxdbRequest {
db: "influxdb".to_string(),
precision: None,
lines: lines.to_string(),
};
let insert_reqs: Vec<InsertRequest> = influxdb_req.try_into().unwrap();
for insert_req in insert_reqs {
assert_eq!("influxdb", insert_req.schema_name);
match &insert_req.table_name[..] {
"monitor1" => assert_table_1(&insert_req),
"monitor2" => assert_table_2(&insert_req),
@@ -216,6 +218,7 @@ monitor2,host=host3 cpu=66.5 1663840496100023102
monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let influxdb_req = &InfluxdbRequest {
db: "public".to_string(),
precision: None,
lines: lines.to_string(),
};
@@ -225,6 +228,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
assert_eq!(2, insert_exprs.len());
for expr in insert_exprs {
assert_eq!("public", expr.schema_name);
let values = match expr.expr.unwrap() {
Expr::Values(vals) => vals,
Expr::Sql(_) => panic!(),

View File

@@ -1,6 +1,6 @@
use std::collections::HashMap;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_grpc::writer::{to_ms_ts, Precision};
use common_time::{timestamp::TimeUnit::Millisecond, Timestamp};
use datatypes::{
@@ -15,6 +15,7 @@ type ColumnLen = usize;
type ColumnName = String;
pub struct LineWriter {
db: String,
table_name: String,
expected_rows: usize,
current_rows: usize,
@@ -22,8 +23,9 @@ pub struct LineWriter {
}
impl LineWriter {
pub fn with_lines(table_name: impl Into<String>, lines: usize) -> Self {
pub fn with_lines(db: impl Into<String>, table_name: impl Into<String>, lines: usize) -> Self {
Self {
db: db.into(),
table_name: table_name.into(),
expected_rows: lines,
current_rows: 0,
@@ -122,8 +124,7 @@ impl LineWriter {
.collect();
InsertRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
// TODO(dennis): supports database
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
schema_name: self.db,
table_name: self.table_name,
columns_values,
}
@@ -134,6 +135,7 @@ impl LineWriter {
mod tests {
use std::sync::Arc;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_time::Timestamp;
use datatypes::{value::Value, vectors::Vector};
@@ -141,7 +143,7 @@ mod tests {
#[test]
fn test_writer() {
let mut writer = LineWriter::with_lines("demo".to_string(), 4);
let mut writer = LineWriter::with_lines(DEFAULT_SCHEMA_NAME, "demo".to_string(), 4);
writer.write_ts("ts", (1665893727685, Precision::MILLISECOND));
writer.write_tag("host", "host-1");
writer.write_i64("memory", 10_i64);
@@ -162,6 +164,7 @@ mod tests {
let insert_request = writer.finish();
assert_eq!("demo", insert_request.table_name);
assert_eq!(DEFAULT_SCHEMA_NAME, insert_request.schema_name);
let columns = insert_request.columns_values;
assert_eq!(5, columns.len());

View File

@@ -117,7 +117,7 @@ impl DataPoint {
}
pub fn as_insert_request(&self) -> InsertRequest {
let mut line_writer = LineWriter::with_lines(self.metric.clone(), 1);
let mut line_writer = LineWriter::with_lines(DEFAULT_SCHEMA_NAME, self.metric.clone(), 1);
line_writer.write_ts(
OPENTSDB_TIMESTAMP_COLUMN_NAME,
(self.ts_millis(), Precision::MILLISECOND),

View File

@@ -11,7 +11,6 @@ use api::v1::{
codec::SelectResult, column, column::SemanticType, insert_expr, Column, ColumnDataType,
InsertExpr,
};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_grpc::writer::Precision::MILLISECOND;
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use snafu::{OptionExt, ResultExt};
@@ -32,7 +31,7 @@ pub struct Metrics {
/// Generate a sql from a remote request query
/// TODO(dennis): maybe use logical plan in future to prevent sql injection
pub fn query_to_sql(q: &Query) -> Result<(String, String)> {
pub fn query_to_sql(db: &str, q: &Query) -> Result<(String, String)> {
let start_timestamp_ms = q.start_timestamp_ms;
let end_timestamp_ms = q.end_timestamp_ms;
@@ -93,8 +92,8 @@ pub fn query_to_sql(q: &Query) -> Result<(String, String)> {
Ok((
table_name.to_string(),
format!(
"select * from {} where {} order by {}",
table_name, conditions, TIMESTAMP_COLUMN_NAME,
"select * from {}.{} where {} order by {}",
db, table_name, conditions, TIMESTAMP_COLUMN_NAME,
),
))
}
@@ -280,16 +279,19 @@ pub fn select_result_to_timeseries(
}
/// Cast a remote write request into InsertRequest
pub fn write_request_to_insert_reqs(mut request: WriteRequest) -> Result<Vec<InsertRequest>> {
pub fn write_request_to_insert_reqs(
db: &str,
mut request: WriteRequest,
) -> Result<Vec<InsertRequest>> {
let timeseries = std::mem::take(&mut request.timeseries);
timeseries
.into_iter()
.map(timeseries_to_insert_request)
.map(|timeseries| timeseries_to_insert_request(db, timeseries))
.collect()
}
fn timeseries_to_insert_request(mut timeseries: TimeSeries) -> Result<InsertRequest> {
fn timeseries_to_insert_request(db: &str, mut timeseries: TimeSeries) -> Result<InsertRequest> {
// TODO(dennis): save exemplars into a column
let labels = std::mem::take(&mut timeseries.labels);
let samples = std::mem::take(&mut timeseries.samples);
@@ -306,7 +308,7 @@ fn timeseries_to_insert_request(mut timeseries: TimeSeries) -> Result<InsertRequ
})?;
let row_count = samples.len();
let mut line_writer = LineWriter::with_lines(table_name, row_count);
let mut line_writer = LineWriter::with_lines(db, table_name, row_count);
for sample in samples {
let ts_millis = sample.timestamp;
@@ -329,18 +331,21 @@ fn timeseries_to_insert_request(mut timeseries: TimeSeries) -> Result<InsertRequ
// TODO(fys): it will remove in the future.
/// Cast a remote write request into gRPC's InsertExpr.
pub fn write_request_to_insert_exprs(mut request: WriteRequest) -> Result<Vec<InsertExpr>> {
pub fn write_request_to_insert_exprs(
database: &str,
mut request: WriteRequest,
) -> Result<Vec<InsertExpr>> {
let timeseries = std::mem::take(&mut request.timeseries);
timeseries
.into_iter()
.map(timeseries_to_insert_expr)
.map(|timeseries| timeseries_to_insert_expr(database, timeseries))
.collect()
}
// TODO(fys): it will remove in the future.
fn timeseries_to_insert_expr(mut timeseries: TimeSeries) -> Result<InsertExpr> {
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
fn timeseries_to_insert_expr(database: &str, mut timeseries: TimeSeries) -> Result<InsertExpr> {
let schema_name = database.to_string();
// TODO(dennis): save exemplars into a column
let labels = std::mem::take(&mut timeseries.labels);
@@ -518,7 +523,7 @@ mod tests {
matchers: vec![],
..Default::default()
};
let err = query_to_sql(&q).unwrap_err();
let err = query_to_sql("public", &q).unwrap_err();
assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
let q = Query {
@@ -531,9 +536,9 @@ mod tests {
}],
..Default::default()
};
let (table, sql) = query_to_sql(&q).unwrap();
let (table, sql) = query_to_sql("public", &q).unwrap();
assert_eq!("test", table);
assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 order by greptime_timestamp", sql);
assert_eq!("select * from public.test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 order by greptime_timestamp", sql);
let q = Query {
start_timestamp_ms: 1000,
@@ -557,9 +562,9 @@ mod tests {
],
..Default::default()
};
let (table, sql) = query_to_sql(&q).unwrap();
let (table, sql) = query_to_sql("public", &q).unwrap();
assert_eq!("test", table);
assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql);
assert_eq!("select * from public.test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql);
}
#[test]
@@ -569,11 +574,12 @@ mod tests {
..Default::default()
};
let reqs = write_request_to_insert_reqs(write_request).unwrap();
let reqs = write_request_to_insert_reqs("public", write_request).unwrap();
assert_eq!(3, reqs.len());
let req1 = reqs.get(0).unwrap();
assert_eq!("public", req1.schema_name);
assert_eq!("metric1", req1.table_name);
let columns = &req1.columns_values;
@@ -593,6 +599,7 @@ mod tests {
assert_vector(&expected, val);
let req2 = reqs.get(1).unwrap();
assert_eq!("public", req2.schema_name);
assert_eq!("metric2", req2.table_name);
let columns = &req2.columns_values;
@@ -616,6 +623,7 @@ mod tests {
assert_vector(&expected, val);
let req3 = reqs.get(2).unwrap();
assert_eq!("public", req3.schema_name);
assert_eq!("metric3", req3.table_name);
let columns = &req3.columns_values;
@@ -654,8 +662,11 @@ mod tests {
..Default::default()
};
let exprs = write_request_to_insert_exprs(write_request).unwrap();
let exprs = write_request_to_insert_exprs("prometheus", write_request).unwrap();
assert_eq!(3, exprs.len());
assert_eq!("prometheus", exprs[0].schema_name);
assert_eq!("prometheus", exprs[1].schema_name);
assert_eq!("prometheus", exprs[2].schema_name);
assert_eq!("metric1", exprs[0].table_name);
assert_eq!("metric2", exprs[1].table_name);
assert_eq!("metric3", exprs[2].table_name);

View File

@@ -67,9 +67,9 @@ pub struct PrometheusResponse {
#[async_trait]
pub trait PrometheusProtocolHandler {
/// Handling prometheus remote write requests
async fn write(&self, request: WriteRequest) -> Result<()>;
async fn write(&self, database: &str, request: WriteRequest) -> Result<()>;
/// Handling prometheus remote read requests
async fn read(&self, request: ReadRequest) -> Result<PrometheusResponse>;
async fn read(&self, database: &str, request: ReadRequest) -> Result<PrometheusResponse>;
/// Handling push gateway requests
async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>;
}

View File

@@ -12,7 +12,7 @@ use servers::query_handler::{InfluxdbLineProtocolHandler, SqlQueryHandler};
use tokio::sync::mpsc;
struct DummyInstance {
tx: mpsc::Sender<String>,
tx: mpsc::Sender<(String, String)>,
}
#[async_trait]
@@ -21,7 +21,7 @@ impl InfluxdbLineProtocolHandler for DummyInstance {
let exprs: Vec<InsertExpr> = request.try_into()?;
for expr in exprs {
let _ = self.tx.send(expr.table_name).await;
let _ = self.tx.send((expr.schema_name, expr.table_name)).await;
}
Ok(())
@@ -43,7 +43,7 @@ impl SqlQueryHandler for DummyInstance {
}
}
fn make_test_app(tx: mpsc::Sender<String>) -> Router {
fn make_test_app(tx: mpsc::Sender<(String, String)>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone());
server.set_influxdb_handler(instance);
@@ -66,6 +66,14 @@ async fn test_influxdb_write() {
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
let result = client
.post("/v1/influxdb/write?db=influxdb")
.body("monitor,host=host1 cpu=1.2 1664370459457010101")
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
// bad request
let result = client
.post("/v1/influxdb/write")
@@ -79,5 +87,11 @@ async fn test_influxdb_write() {
while let Ok(s) = rx.try_recv() {
metrics.push(s);
}
assert_eq!(metrics, vec!["monitor".to_string()]);
assert_eq!(
metrics,
vec![
("public".to_string(), "monitor".to_string()),
("influxdb".to_string(), "monitor".to_string())
]
);
}

View File

@@ -17,18 +17,24 @@ use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse, SqlQ
use tokio::sync::mpsc;
struct DummyInstance {
tx: mpsc::Sender<Vec<u8>>,
tx: mpsc::Sender<(String, Vec<u8>)>,
}
#[async_trait]
impl PrometheusProtocolHandler for DummyInstance {
async fn write(&self, request: WriteRequest) -> Result<()> {
let _ = self.tx.send(request.encode_to_vec()).await;
async fn write(&self, db: &str, request: WriteRequest) -> Result<()> {
let _ = self
.tx
.send((db.to_string(), request.encode_to_vec()))
.await;
Ok(())
}
async fn read(&self, request: ReadRequest) -> Result<PrometheusResponse> {
let _ = self.tx.send(request.encode_to_vec()).await;
async fn read(&self, db: &str, request: ReadRequest) -> Result<PrometheusResponse> {
let _ = self
.tx
.send((db.to_string(), request.encode_to_vec()))
.await;
let response = ReadResponse {
results: vec![QueryResult {
@@ -63,7 +69,7 @@ impl SqlQueryHandler for DummyInstance {
}
}
fn make_test_app(tx: mpsc::Sender<Vec<u8>>) -> Router {
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone());
server.set_prom_handler(instance);
@@ -82,6 +88,7 @@ async fn test_prometheus_remote_write_read() {
..Default::default()
};
// Write to public database
let result = client
.post("/v1/prometheus/write")
.body(snappy_compress(&write_request.clone().encode_to_vec()[..]).unwrap())
@@ -89,6 +96,14 @@ async fn test_prometheus_remote_write_read() {
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
// Write to prometheus database
let result = client
.post("/v1/prometheus/write?db=prometheus")
.body(snappy_compress(&write_request.clone().encode_to_vec()[..]).unwrap())
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
let read_request = ReadRequest {
queries: vec![Query {
@@ -104,8 +119,9 @@ async fn test_prometheus_remote_write_read() {
..Default::default()
};
// Read from prometheus database
let mut result = client
.post("/v1/prometheus/read")
.post("/v1/prometheus/read?db=prometheus")
.body(snappy_compress(&read_request.clone().encode_to_vec()[..]).unwrap())
.send()
.await;
@@ -127,16 +143,41 @@ async fn test_prometheus_remote_write_read() {
prometheus::mock_timeseries()
);
let mut requests = vec![];
// Read from public database
let result = client
.post("/v1/prometheus/read")
.body(snappy_compress(&read_request.clone().encode_to_vec()[..]).unwrap())
.send()
.await;
assert_eq!(result.status(), 200);
let mut requests: Vec<(String, Vec<u8>)> = vec![];
while let Ok(s) = rx.try_recv() {
requests.push(s);
}
assert_eq!(2, requests.len());
assert_eq!(4, requests.len());
assert_eq!("public", requests[0].0);
assert_eq!("prometheus", requests[1].0);
assert_eq!("prometheus", requests[2].0);
assert_eq!("public", requests[3].0);
assert_eq!(
write_request,
WriteRequest::decode(&requests[0][..]).unwrap()
WriteRequest::decode(&(requests[0].1)[..]).unwrap()
);
assert_eq!(
write_request,
WriteRequest::decode(&(requests[1].1)[..]).unwrap()
);
assert_eq!(
read_request,
ReadRequest::decode(&(requests[2].1)[..]).unwrap()
);
assert_eq!(
read_request,
ReadRequest::decode(&(requests[3].1)[..]).unwrap()
);
assert_eq!(read_request, ReadRequest::decode(&requests[1][..]).unwrap());
}