From aa803c5720b203ecb1f54cf7363f7531623d1e4b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 3 Feb 2023 18:12:08 +0000 Subject: [PATCH] script to scrape layer access stats How to use it on a pageserver, or, a machine that is running pageserver through `neon_local` 1. Create a postgres database from somewhere (not the pageserver you want to scrape). E.g., neon.tech. 2. Prepare a `scraper.env`, based on `scraper.env.example` 3. Perform the following steps ``` $ source scraper.env # load schema into your db $ ./pg_install/v14/bin/psql < scraper.db.schema.sql # start the scraper and watch log output $ poetry run python scraper.py ALL ``` Then watch the data pile up in the database, and do something useful with it in the future (TM). --- scraper.db.schema.sql | 55 ++++++++++ scraper.env.example | 7 ++ scraper.py | 246 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 308 insertions(+) create mode 100644 scraper.db.schema.sql create mode 100644 scraper.env.example create mode 100644 scraper.py diff --git a/scraper.db.schema.sql b/scraper.db.schema.sql new file mode 100644 index 0000000000..0b66a8b7ef --- /dev/null +++ b/scraper.db.schema.sql @@ -0,0 +1,55 @@ + +CREATE TABLE layer_map ( + scrape_ts timestamp with time zone, + pageserver_id text, + launch_id text, + tenant_id text, + timeline_id text, + layer_map jsonb +); + +CREATE VIEW last_access_ts AS + SELECT layer_map.scrape_ts, + layer_map.tenant_id, + layer_map.timeline_id, + layer_info.remote, + layer_info.layer_file_name, + layer_info.layer_file_size, + to_timestamp(((max(most_recent_rec.when_millis_since_epoch) / (1000)::numeric))::double precision) AS last_access + FROM layer_map, + LATERAL jsonb_to_recordset(layer_map.layer_map['historic_layers'::text]) layer_info(kind text, remote boolean, layer_file_name text, layer_file_size numeric, access_stats jsonb), + LATERAL jsonb_to_record(layer_info.access_stats) access_stats_rec(most_recent jsonb), + LATERAL jsonb_to_recordset( + CASE + WHEN (jsonb_array_length(access_stats_rec.most_recent) > 0) THEN access_stats_rec.most_recent + ELSE '[{"when_millis_since_epoch": 0}]'::jsonb + END) most_recent_rec(when_millis_since_epoch numeric) + GROUP BY layer_map.scrape_ts, layer_map.tenant_id, layer_map.timeline_id, layer_info.layer_file_name, layer_info.remote, layer_info.layer_file_size; + + +CREATE VIEW last_access AS + SELECT last_access_ts.tenant_id, + last_access_ts.timeline_id, + last_access_ts.layer_file_name, + max(last_access_ts.last_access) AS last_access, + max(last_access_ts.layer_file_size) AS layer_file_size + FROM last_access_ts + GROUP BY last_access_ts.tenant_id, last_access_ts.timeline_id, last_access_ts.layer_file_name; + + +CREATE VIEW layer_files AS + SELECT DISTINCT last_access_ts.tenant_id, + last_access_ts.timeline_id, + last_access_ts.layer_file_name + FROM last_access_ts; + + + +CREATE VIEW most_recent_scrape AS + SELECT last_access_ts.tenant_id, + last_access_ts.timeline_id, + last_access_ts.layer_file_name, + max(last_access_ts.scrape_ts) AS most_recent_scrape_ts + FROM last_access_ts + GROUP BY last_access_ts.tenant_id, last_access_ts.timeline_id, last_access_ts.layer_file_name; + diff --git a/scraper.env.example b/scraper.env.example new file mode 100644 index 0000000000..ce62c95a1e --- /dev/null +++ b/scraper.env.example @@ -0,0 +1,7 @@ +export PGUSER=scraper-staging +export PGPASSWORD=... +export PGHOST=ep-....eu-central-1.aws.neon.tech +export PGDATABASE=scraper-staging +export SCRAPE_ENDPOINT=http://localhost:9898 +export SCRAPE_ENVIRONMENT=staging +export SCRAPE_INTERVAL=99 \ No newline at end of file diff --git a/scraper.py b/scraper.py new file mode 100644 index 0000000000..7cfa32b8f2 --- /dev/null +++ b/scraper.py @@ -0,0 +1,246 @@ +# +# Periodically scrape the layer maps of one or more timelines +# and store the results in an SQL database. +# + +import argparse +import asyncio +import json +import logging +from os import getenv +import sys +from typing import Any, Dict, List, Optional, Set, Tuple +import datetime + +import aiohttp +import asyncpg + + +class ClientException(Exception): + pass + + +class Client: + def __init__(self, pageserver_api_endpoint: str): + self.endpoint = pageserver_api_endpoint + self.sess = aiohttp.ClientSession() + + async def close(self): + await self.sess.close() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_t, exc_v, exc_tb): + await self.close() + + async def get_pageserver_id(self): + resp = await self.sess.get(f"{self.endpoint}/v1/status") + body = await resp.json() + if not resp.ok: + raise ClientException(f"{resp}") + if not isinstance(body, dict): + raise ClientException("expecting dict") + return body["id"] + + async def get_tenant_ids(self): + resp = await self.sess.get(f"{self.endpoint}/v1/tenant") + body = await resp.json() + if not resp.ok: + raise ClientException(f"{resp}") + if not isinstance(body, list): + raise ClientException("expecting list") + return [t["id"] for t in body] + + async def get_timeline_ids(self, tenant_id): + resp = await self.sess.get(f"{self.endpoint}/v1/tenant/{tenant_id}/timeline") + body = await resp.json() + if not resp.ok: + raise ClientException(f"{resp}") + if not isinstance(body, list): + raise ClientException("expecting list") + return [t["timeline_id"] for t in body] + + async def get_layer_map(self, tenant_id, timeline_id, reset) -> Tuple[Optional[str], Any]: + resp = await self.sess.get( + f"{self.endpoint}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer", + params={"reset": reset}, + ) + if not resp.ok: + raise ClientException(f"{resp}") + launch_ts = resp.headers.get("PAGESERVER_LAUNCH_TIMESTAMP", None) + body = await resp.json() + return (launch_ts, body) + + +async def scrape_timeline(ps_id: str, ps_client: Client, db: asyncpg.Pool, tenant_id, timeline_id): + now = datetime.datetime.utcnow() + launch_ts, layer_map = await ps_client.get_layer_map( + tenant_id, + timeline_id, + # Reset the stats on every access to get max resolution on the task kind bitmap. + # Also, under the "every scrape does a full reset" model, it's not as urgent to + # detect pageserver restarts in post-processing, because, to answer the question + # "How often has the layer been accessed since its existence, across ps restarts?" + # we can simply sum up all scrape points that we have for this layer. + reset="AllStats", + ) + await db.execute( + """ + insert into layer_map (scrape_ts, pageserver_id, launch_id, tenant_id, timeline_id, layer_map) + values ($1, $2, $3, $4, $5, $6::jsonb);""", + now, + ps_id, + launch_ts, + tenant_id, + timeline_id, + json.dumps(layer_map), + ) + + +async def timeline_task( + args, ps_id, tenant_id, timeline_id, client: Client, db: asyncpg.Pool, stop_var: asyncio.Event +): + """ + Task loop that is responsible for scraping one timeline + """ + + while not stop_var.is_set(): + try: + logging.info(f"begin scraping timeline {tenant_id}/{timeline_id}") + await scrape_timeline(ps_id, client, db, tenant_id, timeline_id) + logging.info(f"finished scraping timeline {tenant_id}/{timeline_id}") + except Exception: + logging.exception(f"{tenant_id}/{timeline_id} failed, stopping scraping") + return + # TODO: use ticker-like construct instead of sleep() + # TODO: bail out early if stop_var is set. That needs a select()-like statement for Python. Is there any? + await asyncio.sleep(args.interval) + + +async def resolve_what(what: List[str], client: Client): + """ + Resolve the list of "what" arguments on the command line to (tenant,timeline) tuples. + """ + tenant_and_timline_ids: Set[Tuple[str, str]] = set() + # fill tenant_and_timline_ids based on spec + for spec in what: + comps = spec.split(":") + if comps == ["ALL"]: + tenant_ids = await client.get_tenant_ids() + get_timeline_id_coros = [client.get_timeline_ids(tenant_id) for tenant_id in tenant_ids] + gathered = await asyncio.gather(*get_timeline_id_coros, return_exceptions=True) + assert len(tenant_ids) == len(gathered) + for tid, tlids in zip(tenant_ids, gathered): + for tlid in tlids: + tenant_and_timline_ids.add((tid, tlid)) + elif len(comps) == 1: + tid = comps[0] + tlids = await client.get_timeline_ids(tid) + for tlid in tlids: + tenant_and_timline_ids.add((tid, tlid)) + elif len(comps) == 2: + tenant_and_timline_ids.add((comps[0], comps[1])) + else: + raise ValueError(f"invalid what-spec: {spec}") + + return tenant_and_timline_ids + + +async def main_impl(args, db: asyncpg.Pool, client: Client): + """ + Controller loop that manages the per-timeline scrape tasks. + """ + + psid = await client.get_pageserver_id() + scrapedb_ps_id = f"{args.environment}-{psid}" + + logging.info(f"storing results for scrapedb_ps_id={scrapedb_ps_id}") + + active_tasks_lock = asyncio.Lock() + active_tasks: Dict[Tuple[str, str], asyncio.Event] = {} + while True: + try: + desired_tasks = await resolve_what(args.what, client) + except Exception: + logging.exception("failed to resolve --what, sleeping then retrying") + await asyncio.sleep(10) + continue + + async with active_tasks_lock: + active_task_keys = set(active_tasks.keys()) + + # launch new tasks + new_tasks = desired_tasks - active_task_keys + for (tenant_id, timeline_id) in new_tasks: + logging.info(f"launching scrape task for timeline {tenant_id}/{timeline_id}") + stop_var = asyncio.Event() + + async def task_wrapper(): + try: + await timeline_task( + args, scrapedb_ps_id, tenant_id, timeline_id, client, db, stop_var + ) + finally: + async with active_tasks_lock: + del active_tasks[(tenant_id, timeline_id)] + + assert active_tasks.get((tenant_id, timeline_id)) is None + active_tasks[(tenant_id, timeline_id)] = stop_var + asyncio.create_task(task_wrapper()) + + # signal tasks that aren't needed anymore to stop + tasks_to_stop = active_task_keys - desired_tasks + for (tenant_id, timeline_id) in tasks_to_stop: + logging.info(f"stopping scrape task for timeline {tenant_id}/{timeline_id}") + stop_var = active_tasks[(tenant_id, timeline_id)] + stop_var.set() + # the task will remove itself + + # sleep without holding the lock + await asyncio.sleep(10) + + +async def main(args): + + dsn = f"postgres://{args.pg_user}:{args.pg_password}@{args.pg_host}/{args.pg_database}?sslmode=require" + async with asyncpg.create_pool(dsn) as db: + async with Client(args.endpoint) as client: + return await main_impl(args, db, client) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + def envarg(flag, envvar, **kwargs): + parser.add_argument(flag, default=getenv(envvar), required=not getenv(envvar), **kwargs) + + parser.add_argument( + "--verbose", + action="store_true", + help="enable verbose logging", + ) + envarg("--endpoint", "SCRAPE_ENDPOINT", help="where to write report output (default: stdout)") + envarg("--environment", "SCRAPE_ENVIRONMENT", help="environment of the pageserver") + envarg("--interval", "SCRAPE_INTERVAL", type=int) + envarg("--pg-host", "PGHOST") + envarg("--pg-user", "PGUSER") + envarg("--pg-password", "PGPASSWORD") + envarg("--pg-database", "PGDATABASE") + parser.add_argument( + "what", + nargs="+", + help="what to download: ALL|tenant_id|tenant_id:timeline_id", + ) + args = parser.parse_args() + + level = logging.INFO + if args.verbose: + level = logging.DEBUG + logging.basicConfig( + format="%(asctime)s,%(msecs)03d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s", + datefmt="%Y-%m-%d:%H:%M:%S", + level=level, + ) + + sys.exit(asyncio.run(main(args)))