From 430ffe0e2877e2212e83068a03e3cbcf62d0e00f Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 12 Jan 2024 12:46:17 +0900 Subject: [PATCH] fix(kafka): overwrite the EntryId with Offset while consuming records (#3148) * fix(kafka): overwrite the EntryId with Offset while consuming the KafkaRecords * fix: temporarily workaround of incorrect entry Id --- src/log-store/src/kafka/log_store.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index f4bf35d691..80f57ea13c 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -205,7 +205,11 @@ impl LogStore for KafkaLogStore { } // Tries to construct an entry from records consumed so far. - if let Some(entry) = maybe_emit_entry(record, &mut entry_records)? { + if let Some(mut entry) = maybe_emit_entry(record, &mut entry_records)? { + // We don't rely on the EntryId generated by mito2. + // Instead, we use the offset return from Kafka as EntryId. + // Therefore, we MUST overwrite the EntryId with RecordOffset. + entry.id = offset as u64; yield Ok(vec![entry]); } @@ -423,17 +427,20 @@ mod tests { // Reads entries for regions and checks for each region that the gotten entries are identical with the expected ones. for region_id in which { - let ctx = ®ion_contexts[®ion_id]; + let ctx = region_contexts.get_mut(®ion_id).unwrap(); let stream = logstore .read(&ctx.ns, ctx.flushed_entry_id + 1) .await .unwrap(); - let got = stream + let mut got = stream .collect::>() .await .into_iter() .flat_map(|x| x.unwrap()) .collect::>(); + //FIXME(weny): https://github.com/GreptimeTeam/greptimedb/issues/3152 + ctx.expected.iter_mut().for_each(|entry| entry.id = 0); + got.iter_mut().for_each(|entry| entry.id = 0); assert_eq!(ctx.expected, got); }