Switch the billing metrics storage format to ndjson. (#12338)

## Problem

The billing team wants to change the billing events pipeline and use a
common events format in S3 buckets across different event producers.

## Summary of changes

Change the events storage format for billing events from JSON to NDJSON.

Resolves: https://github.com/neondatabase/cloud/issues/29994
This commit is contained in:
Dmitry Savelev
2025-06-24 17:36:36 +02:00
committed by GitHub
parent 4dd9ca7b04
commit 158d84ea30

View File

@@ -399,7 +399,7 @@ async fn collect_metrics_iteration(
fn create_remote_path_prefix(now: DateTime<Utc>) -> String {
format!(
"year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z",
"year={year:04}/month={month:02}/day={day:02}/hour={hour:02}/{hour:02}:{minute:02}:{second:02}Z",
year = now.year(),
month = now.month(),
day = now.day(),
@@ -461,7 +461,7 @@ async fn upload_backup_events(
real_now.second().into(),
real_now.nanosecond(),
));
let path = format!("{path_prefix}_{id}.json.gz");
let path = format!("{path_prefix}_{id}.ndjson.gz");
let remote_path = match RemotePath::from_string(&path) {
Ok(remote_path) => remote_path,
Err(e) => {
@@ -471,9 +471,12 @@ async fn upload_backup_events(
// TODO: This is async compression from Vec to Vec. Rewrite as byte stream.
// Use sync compression in blocking threadpool.
let data = serde_json::to_vec(chunk).context("serialize metrics")?;
let mut encoder = GzipEncoder::new(Vec::new());
encoder.write_all(&data).await.context("compress metrics")?;
for event in chunk.events.iter() {
let data = serde_json::to_vec(event).context("serialize metrics")?;
encoder.write_all(&data).await.context("compress metrics")?;
encoder.write_all(b"\n").await.context("compress metrics")?;
}
encoder.shutdown().await.context("compress metrics")?;
let compressed_data: Bytes = encoder.get_ref().clone().into();
backoff::retry(
@@ -499,7 +502,7 @@ async fn upload_backup_events(
#[cfg(test)]
mod tests {
use std::fs;
use std::io::BufReader;
use std::io::{BufRead, BufReader};
use std::sync::{Arc, Mutex};
use anyhow::Error;
@@ -673,11 +676,22 @@ mod tests {
{
let path = local_fs_path.join(&path_prefix).to_string();
if entry.path().to_str().unwrap().starts_with(&path) {
let chunk = serde_json::from_reader(flate2::bufread::GzDecoder::new(
BufReader::new(fs::File::open(entry.into_path()).unwrap()),
))
.unwrap();
stored_chunks.push(chunk);
let file = fs::File::open(entry.into_path()).unwrap();
let decoder = flate2::bufread::GzDecoder::new(BufReader::new(file));
let reader = BufReader::new(decoder);
let mut events: Vec<Event<Extra, String>> = Vec::new();
for line in reader.lines() {
let line = line.unwrap();
let event: Event<Extra, String> = serde_json::from_str(&line).unwrap();
events.push(event);
}
let report = Report {
events: Cow::Owned(events),
};
stored_chunks.push(report);
}
}
storage_test_dir.close().ok();