feat: fix panic in TimeSeriesParquetReader

This commit is contained in:
evenyag
2025-03-10 14:13:37 +08:00
parent 63bc544514
commit 79d249f5fa

View File

@@ -243,32 +243,35 @@ impl TimeSeriesParquetReader {
if let Some(array) = &self.array {
if self.row_index >= array.len() {
let Some(record_batch) = self
.stream
.next()
.await
.transpose()
.context(ReadParquetSnafu { path: &self.path })
.context(MitoSnafu)?
else {
self.eof = true;
return Ok(None);
};
self.row_index = 0;
self.array = Some(
record_batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.context(MissingColumnSnafu {
column_name: "remote write json column",
})?
.clone(),
);
self.array = None;
}
}
if self.array.is_none() {
let Some(record_batch) = self
.stream
.next()
.await
.transpose()
.context(ReadParquetSnafu { path: &self.path })
.context(MitoSnafu)?
else {
self.eof = true;
return Ok(None);
};
self.array = Some(
record_batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.context(MissingColumnSnafu {
column_name: "remote write json column",
})?
.clone(),
);
}
let array = self.array.as_ref().unwrap();
let value = array.value(self.row_index);
let time_series = serde_json::from_str(value).context(JsonSnafu)?;