fix: panic when querying slowlog (#7689)

* fix: panic when querying slowlog

Signed-off-by: luofucong <luofc@foxmail.com>

* docs: fix json_payload() doc comment to reflect structured return type (#7690)

* Initial plan

* docs: update json_payload doc comment to reflect serde_json::Value return type

Co-authored-by: MichaelScofield <990479+MichaelScofield@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: MichaelScofield <990479+MichaelScofield@users.noreply.github.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
This commit is contained in:
LFC
2026-02-09 18:39:48 +08:00
committed by GitHub
parent 39d3744f4f
commit 56629ec563
8 changed files with 30 additions and 25 deletions

1
Cargo.lock generated
View File

@@ -2391,6 +2391,7 @@ dependencies = [
"humantime",
"humantime-serde",
"itertools 0.14.0",
"jsonb",
"serde",
"serde_json",
"snafu 0.8.6",

View File

@@ -15,6 +15,7 @@ common-time.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
jsonb.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true

View File

@@ -87,9 +87,9 @@ pub trait Event: Send + Sync + Debug {
Timestamp::current_time(TimeUnit::Nanosecond)
}
/// Returns the JSON bytes of the event as the payload. It will use JSON type to store the payload.
fn json_payload(&self) -> Result<String> {
Ok("".to_string())
/// Returns the event payload as a structured JSON value. It will be encoded as JSONB when stored.
fn json_payload(&self) -> Result<serde_json::Value> {
Ok(serde_json::Value::Null)
}
/// Add the extra schema to the event with the default schema.
@@ -164,7 +164,7 @@ pub fn build_row_inserts_request(events: &[&Box<dyn Event>]) -> Result<RowInsert
let mut values = Vec::with_capacity(3 + extra_row.values.len());
values.extend([
ValueData::StringValue(event.event_type().to_string()).into(),
ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(),
ValueData::BinaryValue(jsonb::Value::from(&event.json_payload()?).to_vec()).into(),
ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
]);
values.extend(extra_row.values);
@@ -438,6 +438,8 @@ impl EventProcessor {
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[derive(Debug)]
@@ -448,8 +450,8 @@ mod tests {
"test_event"
}
fn json_payload(&self) -> Result<String> {
Ok("{\"procedure_id\": \"1234567890\"}".to_string())
fn json_payload(&self) -> Result<serde_json::Value> {
Ok(json!({"procedure_id": "1234567890"}))
}
fn as_any(&self) -> &dyn Any {
@@ -470,7 +472,7 @@ mod tests {
.unwrap();
assert_eq!(
event.json_payload().unwrap(),
"{\"procedure_id\": \"1234567890\"}"
json!({"procedure_id": "1234567890"}),
);
assert_eq!(event.event_type(), "test_event");
Ok(())

View File

@@ -122,10 +122,6 @@ impl Event for SlowQueryEvent {
}])
}
fn json_payload(&self) -> Result<String> {
Ok("".to_string())
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -63,7 +63,7 @@ impl Event for ProcedureEvent {
self.timestamp
}
fn json_payload(&self) -> Result<String> {
fn json_payload(&self) -> Result<serde_json::Value> {
self.internal_event.json_payload()
}

View File

@@ -390,6 +390,9 @@ impl Display for JsonType {
/// Converts a json type value to string
pub fn jsonb_to_string(val: &[u8]) -> Result<String> {
if val.is_empty() {
return Ok("".to_string());
}
match jsonb::from_slice(val) {
Ok(jsonb_value) => {
let serialized = jsonb_value.to_string();

View File

@@ -153,8 +153,8 @@ impl Event for RegionMigrationEvent {
Ok(extra_rows)
}
fn json_payload(&self) -> Result<String> {
serde_json::to_string(&Payload {
fn json_payload(&self) -> Result<serde_json::Value> {
serde_json::to_value(Payload {
timeout: self.timeout,
})
.context(SerializeEventSnafu)

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
use std::time::Duration;
use arrow::array::{Array, AsArray};
@@ -124,7 +125,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
pub async fn try_write_one(
self,
output: Result<Output>,
) -> Result<Option<MysqlResultWriter<'a, W>>> {
) -> io::Result<Option<MysqlResultWriter<'a, W>>> {
// We don't support sending multiple query result because the RowWriter's lifetime is bound to
// a local variable.
match output {
@@ -168,7 +169,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
w: QueryResultWriter<'a, W>,
rows: usize,
session: &SessionRef,
) -> Result<QueryResultWriter<'a, W>> {
) -> io::Result<QueryResultWriter<'a, W>> {
let warnings = session.warnings_count() as u16;
let next_writer = w
@@ -185,7 +186,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
mut query_result: QueryResult,
writer: QueryResultWriter<'a, W>,
query_context: QueryContextRef,
) -> Result<()> {
) -> io::Result<()> {
match create_mysql_column_def(&query_result.schema) {
Ok(column_def) => {
// The RowWriter's lifetime is bound to `column_def` thus we can't use finish_one()
@@ -194,13 +195,18 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
while let Some(record_batch) = query_result.stream.next().await {
match record_batch {
Ok(record_batch) => {
Self::write_recordbatch(
if let Err(e) = Self::write_recordbatch(
&mut row_writer,
record_batch,
query_context.clone(),
&query_result.schema,
)
.await?
.await
{
let (kind, err) = handle_err(e, query_context);
row_writer.finish_error(kind, &err.as_bytes()).await?;
return Ok(());
}
}
Err(e) => {
let (kind, err) = handle_err(e, query_context);
@@ -325,11 +331,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
datatypes::arrow_array::duration_array_value(column, i).into();
row_writer.write_col(v)?;
}
DataType::List(_) => {
let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
row_writer.write_col(v.to_string())?;
}
DataType::Struct(_) => {
DataType::List(_) | DataType::Struct(_) => {
let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
row_writer.write_col(v.to_string())?;
}
@@ -360,7 +362,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
error: impl ErrorExt,
w: QueryResultWriter<'a, W>,
query_context: QueryContextRef,
) -> Result<()> {
) -> io::Result<()> {
METRIC_ERROR_COUNTER
.with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
.inc();