From 606b489d532c4964e565b732c3cef0b0fdb8bfb9 Mon Sep 17 00:00:00 2001 From: LFC Date: Mon, 14 Aug 2023 14:40:00 +0800 Subject: [PATCH] feat: redact secrets in sql when logging (#2141) --- Cargo.lock | 1 + src/frontend/src/instance.rs | 5 ++++- src/servers/src/mysql/handler.rs | 37 ++++++++++---------------------- src/servers/src/mysql/writer.rs | 27 +++++++---------------- src/sql/Cargo.toml | 1 + src/sql/src/lib.rs | 1 + src/sql/src/util.rs | 37 ++++++++++++++++++++++++++++++++ 7 files changed, 63 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96bb529ff0..4cc0cbc49e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9221,6 +9221,7 @@ dependencies = [ "hex", "itertools 0.10.5", "once_cell", + "regex", "snafu", "sqlparser 0.34.0", ] diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 576c431ee6..00cbe4e0be 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -44,7 +44,7 @@ use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::key::TableMetadataManager; use common_query::Output; use common_telemetry::logging::{debug, info}; -use common_telemetry::timer; +use common_telemetry::{error, timer}; use datanode::instance::sql::table_idents_to_full_name; use datanode::instance::InstanceRef as DnInstanceRef; use datatypes::schema::Schema; @@ -524,6 +524,9 @@ impl SqlQueryHandler for Instance { results.push(output_result); } Err(e) => { + let redacted = sql::util::redact_sql_secrets(query.as_ref()); + error!(e; "Failed to execute query: {redacted}"); + results.push(Err(e)); break; } diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index fb6d793fef..61668a8e01 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use ::auth::{Identity, Password, UserProviderRef}; use async_trait::async_trait; @@ -23,7 +23,7 @@ use chrono::{NaiveDate, NaiveDateTime}; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_query::Output; -use common_telemetry::{error, info, logging, timer, warn}; +use common_telemetry::{error, logging, timer, warn}; use datatypes::prelude::ConcreteDataType; use metrics::increment_counter; use opensrv_mysql::{ @@ -91,27 +91,16 @@ impl MysqlInstanceShim { } async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { - let trace_id = query_ctx.trace_id(); - info!("Start executing query: '{}'", query); - let start = Instant::now(); - - let output = if let Some(output) = crate::mysql::federated::check(query, query_ctx.clone()) - { + if let Some(output) = crate::mysql::federated::check(query, query_ctx.clone()) { vec![Ok(output)] } else { + let trace_id = query_ctx.trace_id(); common_telemetry::TRACE_ID .scope(trace_id, async move { self.query_handler.do_query(query, query_ctx).await }) .await - }; - - info!( - "Finished executing query: '{}', total time costs in microseconds: {}", - query, - start.elapsed().as_micros() - ); - output + } } /// Execute the logical plan and return the output @@ -287,7 +276,7 @@ impl AsyncMysqlShim for MysqlInstanceShi Some(sql_plan) => sql_plan, }; - let (query, outputs) = match sql_plan.plan { + let outputs = match sql_plan.plan { Some(plan) => { let param_types = plan .get_param_types() @@ -301,23 +290,19 @@ impl AsyncMysqlShim for MysqlInstanceShi } let plan = replace_params_with_values(&plan, param_types, params)?; logging::debug!("Mysql execute prepared plan: {}", plan.display_indent()); - let outputs = vec![ + vec![ self.do_exec_plan(&sql_plan.query, plan, query_ctx.clone()) .await, - ]; - - (sql_plan.query, outputs) + ] } None => { let query = replace_params(params, sql_plan.query); logging::debug!("Mysql execute replaced query: {}", query); - let outputs = self.do_query(&query, query_ctx.clone()).await; - - (query, outputs) + self.do_query(&query, query_ctx.clone()).await } }; - writer::write_output(w, &query, query_ctx, outputs).await?; + writer::write_output(w, query_ctx, outputs).await?; Ok(()) } @@ -347,7 +332,7 @@ impl AsyncMysqlShim for MysqlInstanceShi ] ); let outputs = self.do_query(query, query_ctx.clone()).await; - writer::write_output(writer, query, query_ctx, outputs).await?; + writer::write_output(writer, query_ctx, outputs).await?; Ok(()) } diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 83a802dcd3..3a4ac34872 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -16,7 +16,6 @@ use std::ops::Deref; use common_query::Output; use common_recordbatch::{util, RecordBatch}; -use common_telemetry::warn; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::SchemaRef; use metrics::increment_counter; @@ -31,9 +30,8 @@ use crate::error::{self, Error, Result}; use crate::metrics::*; /// Try to write multiple output to the writer if possible. -pub async fn write_output<'a, W: AsyncWrite + Send + Sync + Unpin>( - w: QueryResultWriter<'a, W>, - query: &str, +pub async fn write_output( + w: QueryResultWriter<'_, W>, query_context: QueryContextRef, outputs: Vec>, ) -> Result<()> { @@ -42,7 +40,7 @@ pub async fn write_output<'a, W: AsyncWrite + Send + Sync + Unpin>( let result_writer = writer.take().context(error::InternalSnafu { err_msg: "Sending multiple result set is unsupported", })?; - writer = result_writer.try_write_one(query, output).await?; + writer = result_writer.try_write_one(output).await?; } if let Some(result_writer) = writer { @@ -75,7 +73,6 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { /// Try to write one result set. If there are more than one result set, return `Some`. pub async fn try_write_one( self, - query: &str, output: Result, ) -> Result>> { // We don't support sending multiple query result because the RowWriter's lifetime is bound to @@ -91,16 +88,14 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { recordbatches, schema, }; - Self::write_query_result(query, query_result, self.writer, self.query_context) - .await?; + Self::write_query_result(query_result, self.writer, self.query_context).await?; } Output::RecordBatches(recordbatches) => { let query_result = QueryResult { schema: recordbatches.schema(), recordbatches: recordbatches.take(), }; - Self::write_query_result(query, query_result, self.writer, self.query_context) - .await?; + Self::write_query_result(query_result, self.writer, self.query_context).await?; } Output::AffectedRows(rows) => { let next_writer = Self::write_affected_rows(self.writer, rows).await?; @@ -110,7 +105,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { ))); } }, - Err(error) => Self::write_query_error(query, error, self.writer).await?, + Err(error) => Self::write_query_error(error, self.writer).await?, } Ok(None) } @@ -135,7 +130,6 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { } async fn write_query_result( - query: &str, query_result: QueryResult, writer: QueryResultWriter<'a, W>, query_context: QueryContextRef, @@ -152,7 +146,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { row_writer.finish().await?; Ok(()) } - Err(error) => Self::write_query_error(query, error, writer).await, + Err(error) => Self::write_query_error(error, writer).await, } } @@ -200,12 +194,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { Ok(()) } - async fn write_query_error( - query: &str, - error: Error, - w: QueryResultWriter<'a, W>, - ) -> Result<()> { - warn!(error; "Failed to execute query '{}'", query); + async fn write_query_error(error: Error, w: QueryResultWriter<'a, W>) -> Result<()> { increment_counter!( METRIC_ERROR_COUNTER, &[(METRIC_PROTOCOL_LABEL, METRIC_ERROR_COUNTER_LABEL_MYSQL)] diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 93f41e081a..ee85f93616 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -16,6 +16,7 @@ datatypes = { workspace = true } hex = "0.4" itertools.workspace = true once_cell.workspace = true +regex.workspace = true snafu = { version = "0.7", features = ["backtraces"] } sqlparser.workspace = true diff --git a/src/sql/src/lib.rs b/src/sql/src/lib.rs index ef9ef453fc..3fe8d29532 100644 --- a/src/sql/src/lib.rs +++ b/src/sql/src/lib.rs @@ -14,6 +14,7 @@ #![feature(box_patterns)] #![feature(assert_matches)] #![feature(let_chains)] +#![feature(lazy_cell)] pub mod ast; pub mod dialect; diff --git a/src/sql/src/util.rs b/src/sql/src/util.rs index 0ad23221a3..f6e98a8648 100644 --- a/src/sql/src/util.rs +++ b/src/sql/src/util.rs @@ -13,9 +13,18 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::LazyLock; +use regex::Regex; use sqlparser::ast::{SqlOption, Value}; +static SQL_SECRET_PATTERNS: LazyLock> = LazyLock::new(|| { + vec![ + Regex::new(r#"(?i)access_key_id=["'](\w*)["'].*"#).unwrap(), + Regex::new(r#"(?i)secret_access_key=["'](\w*)["'].*"#).unwrap(), + ] +}); + pub fn parse_option_string(value: Value) -> Option { match value { Value::SingleQuotedString(v) | Value::DoubleQuotedString(v) => Some(v), @@ -36,3 +45,31 @@ pub fn to_lowercase_options_map(opts: &[SqlOption]) -> HashMap { } map } + +/// Use regex to match and replace common seen secret values in SQL. +pub fn redact_sql_secrets(sql: &str) -> String { + let mut s = sql.to_string(); + for p in SQL_SECRET_PATTERNS.iter() { + if let Some(captures) = p.captures(&s) { + if let Some(m) = captures.get(1) { + s = s.replace(m.as_str(), "******"); + } + } + } + s +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_redact_sql_secrets() { + assert_eq!( + redact_sql_secrets( + r#"COPY 'my_table' FROM '/test.orc' WITH (FORMAT = 'orc') CONNECTION(ENDPOINT = 's3.storage.site', REGION = 'hz', ACCESS_KEY_ID='my_key_id', SECRET_ACCESS_KEY="my_access_key");"# + ), + r#"COPY 'my_table' FROM '/test.orc' WITH (FORMAT = 'orc') CONNECTION(ENDPOINT = 's3.storage.site', REGION = 'hz', ACCESS_KEY_ID='******', SECRET_ACCESS_KEY="******");"# + ); + } +}