diff --git a/src/sst-convert/src/reader/remote_write.rs b/src/sst-convert/src/reader/remote_write.rs index 9fec7a12ca..654f6e278e 100644 --- a/src/sst-convert/src/reader/remote_write.rs +++ b/src/sst-convert/src/reader/remote_write.rs @@ -243,32 +243,35 @@ impl TimeSeriesParquetReader { if let Some(array) = &self.array { if self.row_index >= array.len() { - let Some(record_batch) = self - .stream - .next() - .await - .transpose() - .context(ReadParquetSnafu { path: &self.path }) - .context(MitoSnafu)? - else { - self.eof = true; - return Ok(None); - }; - self.row_index = 0; - self.array = Some( - record_batch - .column(0) - .as_any() - .downcast_ref::() - .context(MissingColumnSnafu { - column_name: "remote write json column", - })? - .clone(), - ); + self.array = None; } } + if self.array.is_none() { + let Some(record_batch) = self + .stream + .next() + .await + .transpose() + .context(ReadParquetSnafu { path: &self.path }) + .context(MitoSnafu)? + else { + self.eof = true; + return Ok(None); + }; + self.array = Some( + record_batch + .column(0) + .as_any() + .downcast_ref::() + .context(MissingColumnSnafu { + column_name: "remote write json column", + })? + .clone(), + ); + } + let array = self.array.as_ref().unwrap(); let value = array.value(self.row_index); let time_series = serde_json::from_str(value).context(JsonSnafu)?;