diff --git a/pageserver/src/consumption_metrics/upload.rs b/pageserver/src/consumption_metrics/upload.rs index eba773272a..16d42b6fe4 100644 --- a/pageserver/src/consumption_metrics/upload.rs +++ b/pageserver/src/consumption_metrics/upload.rs @@ -99,7 +99,7 @@ pub(super) async fn upload_metrics_bucket( // Compose object path let datetime: DateTime = SystemTime::now().into(); - let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/%H:%M:%SZ"); + let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/hour=%H/%H:%M:%SZ"); let path = RemotePath::from_string(&format!("{ts_prefix}_{node_id}.ndjson.gz"))?; // Set up a gzip writer into a buffer @@ -109,7 +109,7 @@ pub(super) async fn upload_metrics_bucket( // Serialize and write into compressed buffer let started_at = std::time::Instant::now(); - for res in serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys) { + for res in serialize_in_chunks_ndjson(CHUNK_SIZE, metrics, idempotency_keys) { let (_chunk, body) = res?; gzip_writer.write_all(&body).await?; } @@ -216,6 +216,86 @@ fn serialize_in_chunks<'a>( } } +/// Serializes the input metrics as NDJSON in chunks of chunk_size. Each event +/// is serialized as a separate JSON object on its own line. The provided +/// idempotency keys are injected into the corresponding metric events (reused +/// across different metrics sinks), and must have the same length as input. +fn serialize_in_chunks_ndjson<'a>( + chunk_size: usize, + input: &'a [NewRawMetric], + idempotency_keys: &'a [IdempotencyKey<'a>], +) -> impl ExactSizeIterator> + 'a +{ + use bytes::BufMut; + + assert_eq!(input.len(), idempotency_keys.len()); + + struct Iter<'a> { + inner: std::slice::Chunks<'a, NewRawMetric>, + idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>, + chunk_size: usize, + + // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries + buffer: bytes::BytesMut, + // chunk amount of events are reused to produce the serialized document + scratch: Vec>, + } + + impl<'a> Iterator for Iter<'a> { + type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>; + + fn next(&mut self) -> Option { + let chunk = self.inner.next()?; + + if self.scratch.is_empty() { + // first round: create events with N strings + self.scratch.extend( + chunk + .iter() + .zip(&mut self.idempotency_keys) + .map(|(raw_metric, key)| raw_metric.as_event(key)), + ); + } else { + // next rounds: update_in_place to reuse allocations + assert_eq!(self.scratch.len(), self.chunk_size); + itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys) + .for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key)); + } + + // Serialize each event as NDJSON (one JSON object per line) + for event in self.scratch[..chunk.len()].iter() { + let res = serde_json::to_writer((&mut self.buffer).writer(), event); + if let Err(e) = res { + return Some(Err(e)); + } + // Add newline after each event to follow NDJSON format + self.buffer.put_u8(b'\n'); + } + + Some(Ok((chunk, self.buffer.split().freeze()))) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } + } + + impl ExactSizeIterator for Iter<'_> {} + + let buffer = bytes::BytesMut::new(); + let inner = input.chunks(chunk_size); + let idempotency_keys = idempotency_keys.iter(); + let scratch = Vec::new(); + + Iter { + inner, + idempotency_keys, + chunk_size, + buffer, + scratch, + } +} + trait RawMetricExt { fn as_event(&self, key: &IdempotencyKey<'_>) -> Event; fn update_in_place(&self, event: &mut Event, key: &IdempotencyKey<'_>); @@ -479,6 +559,43 @@ mod tests { } } + #[test] + fn chunked_serialization_ndjson() { + let examples = metric_samples(); + assert!(examples.len() > 1); + + let now = Utc::now(); + let idempotency_keys = (0..examples.len()) + .map(|i| FixedGen::new(now, "1", i as u16).generate()) + .collect::>(); + + // Parse NDJSON format - each line is a separate JSON object + let parse_ndjson = |body: &[u8]| -> Vec> { + let body_str = std::str::from_utf8(body).unwrap(); + body_str + .trim_end_matches('\n') + .lines() + .filter(|line| !line.is_empty()) + .map(|line| serde_json::from_str::>(line).unwrap()) + .collect() + }; + + let correct = serialize_in_chunks_ndjson(examples.len(), &examples, &idempotency_keys) + .map(|res| res.unwrap().1) + .flat_map(|body| parse_ndjson(&body)) + .collect::>(); + + for chunk_size in 1..examples.len() { + let actual = serialize_in_chunks_ndjson(chunk_size, &examples, &idempotency_keys) + .map(|res| res.unwrap().1) + .flat_map(|body| parse_ndjson(&body)) + .collect::>(); + + // if these are equal, it means that multi-chunking version works as well + assert_eq!(correct, actual); + } + } + #[derive(Clone, Copy)] struct FixedGen<'a>(chrono::DateTime, &'a str, u16); diff --git a/test_runner/regress/test_pageserver_metric_collection.py b/test_runner/regress/test_pageserver_metric_collection.py index 474258c9eb..52c33687ae 100644 --- a/test_runner/regress/test_pageserver_metric_collection.py +++ b/test_runner/regress/test_pageserver_metric_collection.py @@ -180,7 +180,7 @@ def test_metric_collection( httpserver.check() # Check that at least one bucket output object is present, and that all - # can be decompressed and decoded. + # can be decompressed and decoded as NDJSON. bucket_dumps = {} assert isinstance(env.pageserver_remote_storage, LocalFsStorage) for dirpath, _dirs, files in os.walk(env.pageserver_remote_storage.root): @@ -188,7 +188,13 @@ def test_metric_collection( file_path = os.path.join(dirpath, file) log.info(file_path) if file.endswith(".gz"): - bucket_dumps[file_path] = json.load(gzip.open(file_path)) + events = [] + with gzip.open(file_path, "rt") as f: + for line in f: + line = line.strip() + if line: + events.append(json.loads(line)) + bucket_dumps[file_path] = {"events": events} assert len(bucket_dumps) >= 1 assert all("events" in data for data in bucket_dumps.values())