feat: redact secrets in sql when logging (#2141)

This commit is contained in:
LFC
2023-08-14 14:40:00 +08:00
committed by GitHub
parent d0b3607633
commit 606b489d53
7 changed files with 63 additions and 46 deletions

1
Cargo.lock generated
View File

@@ -9221,6 +9221,7 @@ dependencies = [
"hex", "hex",
"itertools 0.10.5", "itertools 0.10.5",
"once_cell", "once_cell",
"regex",
"snafu", "snafu",
"sqlparser 0.34.0", "sqlparser 0.34.0",
] ]

View File

@@ -44,7 +44,7 @@ use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::TableMetadataManager; use common_meta::key::TableMetadataManager;
use common_query::Output; use common_query::Output;
use common_telemetry::logging::{debug, info}; use common_telemetry::logging::{debug, info};
use common_telemetry::timer; use common_telemetry::{error, timer};
use datanode::instance::sql::table_idents_to_full_name; use datanode::instance::sql::table_idents_to_full_name;
use datanode::instance::InstanceRef as DnInstanceRef; use datanode::instance::InstanceRef as DnInstanceRef;
use datatypes::schema::Schema; use datatypes::schema::Schema;
@@ -524,6 +524,9 @@ impl SqlQueryHandler for Instance {
results.push(output_result); results.push(output_result);
} }
Err(e) => { Err(e) => {
let redacted = sql::util::redact_sql_secrets(query.as_ref());
error!(e; "Failed to execute query: {redacted}");
results.push(Err(e)); results.push(Err(e));
break; break;
} }

View File

@@ -15,7 +15,7 @@ use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::Duration;
use ::auth::{Identity, Password, UserProviderRef}; use ::auth::{Identity, Password, UserProviderRef};
use async_trait::async_trait; use async_trait::async_trait;
@@ -23,7 +23,7 @@ use chrono::{NaiveDate, NaiveDateTime};
use common_catalog::parse_catalog_and_schema_from_db_string; use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt; use common_error::ext::ErrorExt;
use common_query::Output; use common_query::Output;
use common_telemetry::{error, info, logging, timer, warn}; use common_telemetry::{error, logging, timer, warn};
use datatypes::prelude::ConcreteDataType; use datatypes::prelude::ConcreteDataType;
use metrics::increment_counter; use metrics::increment_counter;
use opensrv_mysql::{ use opensrv_mysql::{
@@ -91,27 +91,16 @@ impl MysqlInstanceShim {
} }
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> { async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
let trace_id = query_ctx.trace_id(); if let Some(output) = crate::mysql::federated::check(query, query_ctx.clone()) {
info!("Start executing query: '{}'", query);
let start = Instant::now();
let output = if let Some(output) = crate::mysql::federated::check(query, query_ctx.clone())
{
vec![Ok(output)] vec![Ok(output)]
} else { } else {
let trace_id = query_ctx.trace_id();
common_telemetry::TRACE_ID common_telemetry::TRACE_ID
.scope(trace_id, async move { .scope(trace_id, async move {
self.query_handler.do_query(query, query_ctx).await self.query_handler.do_query(query, query_ctx).await
}) })
.await .await
}; }
info!(
"Finished executing query: '{}', total time costs in microseconds: {}",
query,
start.elapsed().as_micros()
);
output
} }
/// Execute the logical plan and return the output /// Execute the logical plan and return the output
@@ -287,7 +276,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
Some(sql_plan) => sql_plan, Some(sql_plan) => sql_plan,
}; };
let (query, outputs) = match sql_plan.plan { let outputs = match sql_plan.plan {
Some(plan) => { Some(plan) => {
let param_types = plan let param_types = plan
.get_param_types() .get_param_types()
@@ -301,23 +290,19 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
} }
let plan = replace_params_with_values(&plan, param_types, params)?; let plan = replace_params_with_values(&plan, param_types, params)?;
logging::debug!("Mysql execute prepared plan: {}", plan.display_indent()); logging::debug!("Mysql execute prepared plan: {}", plan.display_indent());
let outputs = vec![ vec![
self.do_exec_plan(&sql_plan.query, plan, query_ctx.clone()) self.do_exec_plan(&sql_plan.query, plan, query_ctx.clone())
.await, .await,
]; ]
(sql_plan.query, outputs)
} }
None => { None => {
let query = replace_params(params, sql_plan.query); let query = replace_params(params, sql_plan.query);
logging::debug!("Mysql execute replaced query: {}", query); logging::debug!("Mysql execute replaced query: {}", query);
let outputs = self.do_query(&query, query_ctx.clone()).await; self.do_query(&query, query_ctx.clone()).await
(query, outputs)
} }
}; };
writer::write_output(w, &query, query_ctx, outputs).await?; writer::write_output(w, query_ctx, outputs).await?;
Ok(()) Ok(())
} }
@@ -347,7 +332,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
] ]
); );
let outputs = self.do_query(query, query_ctx.clone()).await; let outputs = self.do_query(query, query_ctx.clone()).await;
writer::write_output(writer, query, query_ctx, outputs).await?; writer::write_output(writer, query_ctx, outputs).await?;
Ok(()) Ok(())
} }

View File

@@ -16,7 +16,6 @@ use std::ops::Deref;
use common_query::Output; use common_query::Output;
use common_recordbatch::{util, RecordBatch}; use common_recordbatch::{util, RecordBatch};
use common_telemetry::warn;
use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::SchemaRef; use datatypes::schema::SchemaRef;
use metrics::increment_counter; use metrics::increment_counter;
@@ -31,9 +30,8 @@ use crate::error::{self, Error, Result};
use crate::metrics::*; use crate::metrics::*;
/// Try to write multiple output to the writer if possible. /// Try to write multiple output to the writer if possible.
pub async fn write_output<'a, W: AsyncWrite + Send + Sync + Unpin>( pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
w: QueryResultWriter<'a, W>, w: QueryResultWriter<'_, W>,
query: &str,
query_context: QueryContextRef, query_context: QueryContextRef,
outputs: Vec<Result<Output>>, outputs: Vec<Result<Output>>,
) -> Result<()> { ) -> Result<()> {
@@ -42,7 +40,7 @@ pub async fn write_output<'a, W: AsyncWrite + Send + Sync + Unpin>(
let result_writer = writer.take().context(error::InternalSnafu { let result_writer = writer.take().context(error::InternalSnafu {
err_msg: "Sending multiple result set is unsupported", err_msg: "Sending multiple result set is unsupported",
})?; })?;
writer = result_writer.try_write_one(query, output).await?; writer = result_writer.try_write_one(output).await?;
} }
if let Some(result_writer) = writer { if let Some(result_writer) = writer {
@@ -75,7 +73,6 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
/// Try to write one result set. If there are more than one result set, return `Some`. /// Try to write one result set. If there are more than one result set, return `Some`.
pub async fn try_write_one( pub async fn try_write_one(
self, self,
query: &str,
output: Result<Output>, output: Result<Output>,
) -> Result<Option<MysqlResultWriter<'a, W>>> { ) -> Result<Option<MysqlResultWriter<'a, W>>> {
// We don't support sending multiple query result because the RowWriter's lifetime is bound to // We don't support sending multiple query result because the RowWriter's lifetime is bound to
@@ -91,16 +88,14 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
recordbatches, recordbatches,
schema, schema,
}; };
Self::write_query_result(query, query_result, self.writer, self.query_context) Self::write_query_result(query_result, self.writer, self.query_context).await?;
.await?;
} }
Output::RecordBatches(recordbatches) => { Output::RecordBatches(recordbatches) => {
let query_result = QueryResult { let query_result = QueryResult {
schema: recordbatches.schema(), schema: recordbatches.schema(),
recordbatches: recordbatches.take(), recordbatches: recordbatches.take(),
}; };
Self::write_query_result(query, query_result, self.writer, self.query_context) Self::write_query_result(query_result, self.writer, self.query_context).await?;
.await?;
} }
Output::AffectedRows(rows) => { Output::AffectedRows(rows) => {
let next_writer = Self::write_affected_rows(self.writer, rows).await?; let next_writer = Self::write_affected_rows(self.writer, rows).await?;
@@ -110,7 +105,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
))); )));
} }
}, },
Err(error) => Self::write_query_error(query, error, self.writer).await?, Err(error) => Self::write_query_error(error, self.writer).await?,
} }
Ok(None) Ok(None)
} }
@@ -135,7 +130,6 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
} }
async fn write_query_result( async fn write_query_result(
query: &str,
query_result: QueryResult, query_result: QueryResult,
writer: QueryResultWriter<'a, W>, writer: QueryResultWriter<'a, W>,
query_context: QueryContextRef, query_context: QueryContextRef,
@@ -152,7 +146,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
row_writer.finish().await?; row_writer.finish().await?;
Ok(()) Ok(())
} }
Err(error) => Self::write_query_error(query, error, writer).await, Err(error) => Self::write_query_error(error, writer).await,
} }
} }
@@ -200,12 +194,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
Ok(()) Ok(())
} }
async fn write_query_error( async fn write_query_error(error: Error, w: QueryResultWriter<'a, W>) -> Result<()> {
query: &str,
error: Error,
w: QueryResultWriter<'a, W>,
) -> Result<()> {
warn!(error; "Failed to execute query '{}'", query);
increment_counter!( increment_counter!(
METRIC_ERROR_COUNTER, METRIC_ERROR_COUNTER,
&[(METRIC_PROTOCOL_LABEL, METRIC_ERROR_COUNTER_LABEL_MYSQL)] &[(METRIC_PROTOCOL_LABEL, METRIC_ERROR_COUNTER_LABEL_MYSQL)]

View File

@@ -16,6 +16,7 @@ datatypes = { workspace = true }
hex = "0.4" hex = "0.4"
itertools.workspace = true itertools.workspace = true
once_cell.workspace = true once_cell.workspace = true
regex.workspace = true
snafu = { version = "0.7", features = ["backtraces"] } snafu = { version = "0.7", features = ["backtraces"] }
sqlparser.workspace = true sqlparser.workspace = true

View File

@@ -14,6 +14,7 @@
#![feature(box_patterns)] #![feature(box_patterns)]
#![feature(assert_matches)] #![feature(assert_matches)]
#![feature(let_chains)] #![feature(let_chains)]
#![feature(lazy_cell)]
pub mod ast; pub mod ast;
pub mod dialect; pub mod dialect;

View File

@@ -13,9 +13,18 @@
// limitations under the License. // limitations under the License.
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::LazyLock;
use regex::Regex;
use sqlparser::ast::{SqlOption, Value}; use sqlparser::ast::{SqlOption, Value};
static SQL_SECRET_PATTERNS: LazyLock<Vec<Regex>> = LazyLock::new(|| {
vec![
Regex::new(r#"(?i)access_key_id=["'](\w*)["'].*"#).unwrap(),
Regex::new(r#"(?i)secret_access_key=["'](\w*)["'].*"#).unwrap(),
]
});
pub fn parse_option_string(value: Value) -> Option<String> { pub fn parse_option_string(value: Value) -> Option<String> {
match value { match value {
Value::SingleQuotedString(v) | Value::DoubleQuotedString(v) => Some(v), Value::SingleQuotedString(v) | Value::DoubleQuotedString(v) => Some(v),
@@ -36,3 +45,31 @@ pub fn to_lowercase_options_map(opts: &[SqlOption]) -> HashMap<String, String> {
} }
map map
} }
/// Use regex to match and replace common seen secret values in SQL.
pub fn redact_sql_secrets(sql: &str) -> String {
let mut s = sql.to_string();
for p in SQL_SECRET_PATTERNS.iter() {
if let Some(captures) = p.captures(&s) {
if let Some(m) = captures.get(1) {
s = s.replace(m.as_str(), "******");
}
}
}
s
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_redact_sql_secrets() {
assert_eq!(
redact_sql_secrets(
r#"COPY 'my_table' FROM '/test.orc' WITH (FORMAT = 'orc') CONNECTION(ENDPOINT = 's3.storage.site', REGION = 'hz', ACCESS_KEY_ID='my_key_id', SECRET_ACCESS_KEY="my_access_key");"#
),
r#"COPY 'my_table' FROM '/test.orc' WITH (FORMAT = 'orc') CONNECTION(ENDPOINT = 's3.storage.site', REGION = 'hz', ACCESS_KEY_ID='******', SECRET_ACCESS_KEY="******");"#
);
}
}