diff --git a/test_runner/performance/test_sharded_ingest.py b/test_runner/performance/test_sharded_ingest.py index b6bf6dffa1..e965aae5a0 100644 --- a/test_runner/performance/test_sharded_ingest.py +++ b/test_runner/performance/test_sharded_ingest.py @@ -55,7 +55,6 @@ def test_sharded_ingest( # Start the endpoint. endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0]) - # Ingest data and measure WAL volume and duration. with closing(endpoint.connect()) as conn: with conn.cursor() as cur: @@ -73,4 +72,48 @@ def test_sharded_ingest( wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024)) zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM) + total_ingested = 0 + total_records_received = 0 + ingested_by_ps = [] + for pageserver in env.pageservers: + ingested = pageserver.http_client().get_metric_value( + "pageserver_wal_ingest_bytes_received_total" + ) + records_received = pageserver.http_client().get_metric_value( + "pageserver_wal_ingest_records_received_total" + ) + + if ingested is None: + ingested = 0 + + if records_received is None: + records_received = 0 + + ingested_by_ps.append( + ( + pageserver.id, + { + "ingested": ingested, + "records_received": records_received, + }, + ) + ) + + total_ingested += int(ingested) + total_records_received += int(records_received) + + total_ingested_mb = total_ingested / (1024 * 1024) + zenbenchmark.record("wal_ingested", total_ingested_mb, "MB", MetricReport.LOWER_IS_BETTER) + zenbenchmark.record( + "records_received", total_records_received, "records", MetricReport.LOWER_IS_BETTER + ) + + ingested_by_ps.sort(key=lambda x: x[0]) + for _, stats in ingested_by_ps: + for k in stats: + if k != "records_received": + stats[k] /= 1024**2 + + log.info(f"WAL ingested by each pageserver {ingested_by_ps}") + assert tenant_get_shards(env, tenant_id) == shards, "shards moved"