From 158d84ea30c65bcffdd6f9ae73aefedc9be912b3 Mon Sep 17 00:00:00 2001 From: Dmitry Savelev Date: Tue, 24 Jun 2025 17:36:36 +0200 Subject: [PATCH] Switch the billing metrics storage format to ndjson. (#12338) ## Problem The billing team wants to change the billing events pipeline and use a common events format in S3 buckets across different event producers. ## Summary of changes Change the events storage format for billing events from JSON to NDJSON. Resolves: https://github.com/neondatabase/cloud/issues/29994 --- proxy/src/usage_metrics.rs | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 115b958c54..c82c4865a7 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -399,7 +399,7 @@ async fn collect_metrics_iteration( fn create_remote_path_prefix(now: DateTime) -> String { format!( - "year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z", + "year={year:04}/month={month:02}/day={day:02}/hour={hour:02}/{hour:02}:{minute:02}:{second:02}Z", year = now.year(), month = now.month(), day = now.day(), @@ -461,7 +461,7 @@ async fn upload_backup_events( real_now.second().into(), real_now.nanosecond(), )); - let path = format!("{path_prefix}_{id}.json.gz"); + let path = format!("{path_prefix}_{id}.ndjson.gz"); let remote_path = match RemotePath::from_string(&path) { Ok(remote_path) => remote_path, Err(e) => { @@ -471,9 +471,12 @@ async fn upload_backup_events( // TODO: This is async compression from Vec to Vec. Rewrite as byte stream. // Use sync compression in blocking threadpool. - let data = serde_json::to_vec(chunk).context("serialize metrics")?; let mut encoder = GzipEncoder::new(Vec::new()); - encoder.write_all(&data).await.context("compress metrics")?; + for event in chunk.events.iter() { + let data = serde_json::to_vec(event).context("serialize metrics")?; + encoder.write_all(&data).await.context("compress metrics")?; + encoder.write_all(b"\n").await.context("compress metrics")?; + } encoder.shutdown().await.context("compress metrics")?; let compressed_data: Bytes = encoder.get_ref().clone().into(); backoff::retry( @@ -499,7 +502,7 @@ async fn upload_backup_events( #[cfg(test)] mod tests { use std::fs; - use std::io::BufReader; + use std::io::{BufRead, BufReader}; use std::sync::{Arc, Mutex}; use anyhow::Error; @@ -673,11 +676,22 @@ mod tests { { let path = local_fs_path.join(&path_prefix).to_string(); if entry.path().to_str().unwrap().starts_with(&path) { - let chunk = serde_json::from_reader(flate2::bufread::GzDecoder::new( - BufReader::new(fs::File::open(entry.into_path()).unwrap()), - )) - .unwrap(); - stored_chunks.push(chunk); + let file = fs::File::open(entry.into_path()).unwrap(); + let decoder = flate2::bufread::GzDecoder::new(BufReader::new(file)); + let reader = BufReader::new(decoder); + + let mut events: Vec> = Vec::new(); + for line in reader.lines() { + let line = line.unwrap(); + let event: Event = serde_json::from_str(&line).unwrap(); + events.push(event); + } + + let report = Report { + events: Cow::Owned(events), + }; + + stored_chunks.push(report); } } storage_test_dir.close().ok();