Switch the billing metrics storage format to ndjson. (#12427)

## Problem
The billing team wants to change the billing events pipeline and use a
common events format in S3 buckets across different event producers.

## Summary of changes
Change the events storage format for billing events from JSON to NDJSON.
Also partition files by hours, rather than days.

Resolves: https://github.com/neondatabase/cloud/issues/29995
This commit is contained in:
Dmitry Savelev
2025-07-02 18:30:47 +02:00
committed by GitHub
parent d6beb3ffbb
commit 0429a0db16
2 changed files with 127 additions and 4 deletions

View File

@@ -99,7 +99,7 @@ pub(super) async fn upload_metrics_bucket(
// Compose object path
let datetime: DateTime<Utc> = SystemTime::now().into();
let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/%H:%M:%SZ");
let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/hour=%H/%H:%M:%SZ");
let path = RemotePath::from_string(&format!("{ts_prefix}_{node_id}.ndjson.gz"))?;
// Set up a gzip writer into a buffer
@@ -109,7 +109,7 @@ pub(super) async fn upload_metrics_bucket(
// Serialize and write into compressed buffer
let started_at = std::time::Instant::now();
for res in serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys) {
for res in serialize_in_chunks_ndjson(CHUNK_SIZE, metrics, idempotency_keys) {
let (_chunk, body) = res?;
gzip_writer.write_all(&body).await?;
}
@@ -216,6 +216,86 @@ fn serialize_in_chunks<'a>(
}
}
/// Serializes the input metrics as NDJSON in chunks of chunk_size. Each event
/// is serialized as a separate JSON object on its own line. The provided
/// idempotency keys are injected into the corresponding metric events (reused
/// across different metrics sinks), and must have the same length as input.
fn serialize_in_chunks_ndjson<'a>(
chunk_size: usize,
input: &'a [NewRawMetric],
idempotency_keys: &'a [IdempotencyKey<'a>],
) -> impl ExactSizeIterator<Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>> + 'a
{
use bytes::BufMut;
assert_eq!(input.len(), idempotency_keys.len());
struct Iter<'a> {
inner: std::slice::Chunks<'a, NewRawMetric>,
idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
chunk_size: usize,
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
buffer: bytes::BytesMut,
// chunk amount of events are reused to produce the serialized document
scratch: Vec<Event<Ids, Name>>,
}
impl<'a> Iterator for Iter<'a> {
type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>;
fn next(&mut self) -> Option<Self::Item> {
let chunk = self.inner.next()?;
if self.scratch.is_empty() {
// first round: create events with N strings
self.scratch.extend(
chunk
.iter()
.zip(&mut self.idempotency_keys)
.map(|(raw_metric, key)| raw_metric.as_event(key)),
);
} else {
// next rounds: update_in_place to reuse allocations
assert_eq!(self.scratch.len(), self.chunk_size);
itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys)
.for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key));
}
// Serialize each event as NDJSON (one JSON object per line)
for event in self.scratch[..chunk.len()].iter() {
let res = serde_json::to_writer((&mut self.buffer).writer(), event);
if let Err(e) = res {
return Some(Err(e));
}
// Add newline after each event to follow NDJSON format
self.buffer.put_u8(b'\n');
}
Some(Ok((chunk, self.buffer.split().freeze())))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl ExactSizeIterator for Iter<'_> {}
let buffer = bytes::BytesMut::new();
let inner = input.chunks(chunk_size);
let idempotency_keys = idempotency_keys.iter();
let scratch = Vec::new();
Iter {
inner,
idempotency_keys,
chunk_size,
buffer,
scratch,
}
}
trait RawMetricExt {
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
@@ -479,6 +559,43 @@ mod tests {
}
}
#[test]
fn chunked_serialization_ndjson() {
let examples = metric_samples();
assert!(examples.len() > 1);
let now = Utc::now();
let idempotency_keys = (0..examples.len())
.map(|i| FixedGen::new(now, "1", i as u16).generate())
.collect::<Vec<_>>();
// Parse NDJSON format - each line is a separate JSON object
let parse_ndjson = |body: &[u8]| -> Vec<Event<Ids, Name>> {
let body_str = std::str::from_utf8(body).unwrap();
body_str
.trim_end_matches('\n')
.lines()
.filter(|line| !line.is_empty())
.map(|line| serde_json::from_str::<Event<Ids, Name>>(line).unwrap())
.collect()
};
let correct = serialize_in_chunks_ndjson(examples.len(), &examples, &idempotency_keys)
.map(|res| res.unwrap().1)
.flat_map(|body| parse_ndjson(&body))
.collect::<Vec<_>>();
for chunk_size in 1..examples.len() {
let actual = serialize_in_chunks_ndjson(chunk_size, &examples, &idempotency_keys)
.map(|res| res.unwrap().1)
.flat_map(|body| parse_ndjson(&body))
.collect::<Vec<_>>();
// if these are equal, it means that multi-chunking version works as well
assert_eq!(correct, actual);
}
}
#[derive(Clone, Copy)]
struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);

View File

@@ -180,7 +180,7 @@ def test_metric_collection(
httpserver.check()
# Check that at least one bucket output object is present, and that all
# can be decompressed and decoded.
# can be decompressed and decoded as NDJSON.
bucket_dumps = {}
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
for dirpath, _dirs, files in os.walk(env.pageserver_remote_storage.root):
@@ -188,7 +188,13 @@ def test_metric_collection(
file_path = os.path.join(dirpath, file)
log.info(file_path)
if file.endswith(".gz"):
bucket_dumps[file_path] = json.load(gzip.open(file_path))
events = []
with gzip.open(file_path, "rt") as f:
for line in f:
line = line.strip()
if line:
events.append(json.loads(line))
bucket_dumps[file_path] = {"events": events}
assert len(bucket_dumps) >= 1
assert all("events" in data for data in bucket_dumps.values())