diff --git a/Cargo.lock b/Cargo.lock index cfb3169a2e..94ef17bca2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2391,6 +2391,7 @@ dependencies = [ "humantime", "humantime-serde", "itertools 0.14.0", + "jsonb", "serde", "serde_json", "snafu 0.8.6", diff --git a/src/common/event-recorder/Cargo.toml b/src/common/event-recorder/Cargo.toml index f9ca25c40c..33997115e4 100644 --- a/src/common/event-recorder/Cargo.toml +++ b/src/common/event-recorder/Cargo.toml @@ -15,6 +15,7 @@ common-time.workspace = true humantime.workspace = true humantime-serde.workspace = true itertools.workspace = true +jsonb.workspace = true serde.workspace = true serde_json.workspace = true snafu.workspace = true diff --git a/src/common/event-recorder/src/recorder.rs b/src/common/event-recorder/src/recorder.rs index ace7702991..eb014c638f 100644 --- a/src/common/event-recorder/src/recorder.rs +++ b/src/common/event-recorder/src/recorder.rs @@ -87,9 +87,9 @@ pub trait Event: Send + Sync + Debug { Timestamp::current_time(TimeUnit::Nanosecond) } - /// Returns the JSON bytes of the event as the payload. It will use JSON type to store the payload. - fn json_payload(&self) -> Result { - Ok("".to_string()) + /// Returns the event payload as a structured JSON value. It will be encoded as JSONB when stored. + fn json_payload(&self) -> Result { + Ok(serde_json::Value::Null) } /// Add the extra schema to the event with the default schema. @@ -164,7 +164,7 @@ pub fn build_row_inserts_request(events: &[&Box]) -> Result Result { - Ok("{\"procedure_id\": \"1234567890\"}".to_string()) + fn json_payload(&self) -> Result { + Ok(json!({"procedure_id": "1234567890"})) } fn as_any(&self) -> &dyn Any { @@ -470,7 +472,7 @@ mod tests { .unwrap(); assert_eq!( event.json_payload().unwrap(), - "{\"procedure_id\": \"1234567890\"}" + json!({"procedure_id": "1234567890"}), ); assert_eq!(event.event_type(), "test_event"); Ok(()) diff --git a/src/common/frontend/src/slow_query_event.rs b/src/common/frontend/src/slow_query_event.rs index 32ca457da4..f9a2620283 100644 --- a/src/common/frontend/src/slow_query_event.rs +++ b/src/common/frontend/src/slow_query_event.rs @@ -122,10 +122,6 @@ impl Event for SlowQueryEvent { }]) } - fn json_payload(&self) -> Result { - Ok("".to_string()) - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/common/procedure/src/event.rs b/src/common/procedure/src/event.rs index d659236369..82e39f5900 100644 --- a/src/common/procedure/src/event.rs +++ b/src/common/procedure/src/event.rs @@ -63,7 +63,7 @@ impl Event for ProcedureEvent { self.timestamp } - fn json_payload(&self) -> Result { + fn json_payload(&self) -> Result { self.internal_event.json_payload() } diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index 67c3ec951e..61586fc460 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -390,6 +390,9 @@ impl Display for JsonType { /// Converts a json type value to string pub fn jsonb_to_string(val: &[u8]) -> Result { + if val.is_empty() { + return Ok("".to_string()); + } match jsonb::from_slice(val) { Ok(jsonb_value) => { let serialized = jsonb_value.to_string(); diff --git a/src/meta-srv/src/events/region_migration_event.rs b/src/meta-srv/src/events/region_migration_event.rs index 7e5c5b6fc2..fd1d81ff33 100644 --- a/src/meta-srv/src/events/region_migration_event.rs +++ b/src/meta-srv/src/events/region_migration_event.rs @@ -153,8 +153,8 @@ impl Event for RegionMigrationEvent { Ok(extra_rows) } - fn json_payload(&self) -> Result { - serde_json::to_string(&Payload { + fn json_payload(&self) -> Result { + serde_json::to_value(Payload { timeout: self.timeout, }) .context(SerializeEventSnafu) diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index f0b6425788..5c9b9c7ef7 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io; use std::time::Duration; use arrow::array::{Array, AsArray}; @@ -124,7 +125,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { pub async fn try_write_one( self, output: Result, - ) -> Result>> { + ) -> io::Result>> { // We don't support sending multiple query result because the RowWriter's lifetime is bound to // a local variable. match output { @@ -168,7 +169,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { w: QueryResultWriter<'a, W>, rows: usize, session: &SessionRef, - ) -> Result> { + ) -> io::Result> { let warnings = session.warnings_count() as u16; let next_writer = w @@ -185,7 +186,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { mut query_result: QueryResult, writer: QueryResultWriter<'a, W>, query_context: QueryContextRef, - ) -> Result<()> { + ) -> io::Result<()> { match create_mysql_column_def(&query_result.schema) { Ok(column_def) => { // The RowWriter's lifetime is bound to `column_def` thus we can't use finish_one() @@ -194,13 +195,18 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { while let Some(record_batch) = query_result.stream.next().await { match record_batch { Ok(record_batch) => { - Self::write_recordbatch( + if let Err(e) = Self::write_recordbatch( &mut row_writer, record_batch, query_context.clone(), &query_result.schema, ) - .await? + .await + { + let (kind, err) = handle_err(e, query_context); + row_writer.finish_error(kind, &err.as_bytes()).await?; + return Ok(()); + } } Err(e) => { let (kind, err) = handle_err(e, query_context); @@ -325,11 +331,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { datatypes::arrow_array::duration_array_value(column, i).into(); row_writer.write_col(v)?; } - DataType::List(_) => { - let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?; - row_writer.write_col(v.to_string())?; - } - DataType::Struct(_) => { + DataType::List(_) | DataType::Struct(_) => { let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?; row_writer.write_col(v.to_string())?; } @@ -360,7 +362,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { error: impl ErrorExt, w: QueryResultWriter<'a, W>, query_context: QueryContextRef, - ) -> Result<()> { + ) -> io::Result<()> { METRIC_ERROR_COUNTER .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL]) .inc();