mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-01 04:40:39 +00:00
fix: clone data instead of moving it - homemade future is dangerous (#3542)
* fix: clone data instead of moving it - homemade future is dangerous Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * add comment --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -255,18 +255,21 @@ impl Stream for SeriesDivideStream {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
if let Some(batch) = self.buffer.take() {
|
||||
// It has to be cloned here, otherwise the later ready! will mess things up
|
||||
if let Some(batch) = self.buffer.clone() {
|
||||
let same_length = self.find_first_diff_row(&batch) + 1;
|
||||
if same_length >= batch.num_rows() {
|
||||
let next_batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
|
||||
Some(Ok(next_batch)) => next_batch,
|
||||
Some(Ok(batch)) => batch,
|
||||
None => {
|
||||
self.buffer = None;
|
||||
self.num_series += 1;
|
||||
return Poll::Ready(Some(Ok(batch)));
|
||||
}
|
||||
error => return Poll::Ready(error),
|
||||
};
|
||||
let new_batch = compute::concat_batches(&batch.schema(), &[batch, next_batch])?;
|
||||
let new_batch =
|
||||
compute::concat_batches(&batch.schema(), &[batch.clone(), next_batch])?;
|
||||
self.buffer = Some(new_batch);
|
||||
continue;
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user