mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
1 Commits
rustls
...
dkr/plumbe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
daac088c5e |
581
scripts/plumber.py
Normal file
581
scripts/plumber.py
Normal file
@@ -0,0 +1,581 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import enum
|
||||
import json
|
||||
import os
|
||||
import pprint
|
||||
import tempfile
|
||||
from asyncio import subprocess
|
||||
from datetime import date, datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Set
|
||||
|
||||
"""
|
||||
This is the automation tool that was mostly helpful during our big aws account migration,
|
||||
but may be helpful in other day to day tasks and concentrate knowledge about operations
|
||||
that can help during on-call.
|
||||
|
||||
|
||||
This script executes commands on remote using ssh multiplexing. See referenes:
|
||||
https://blog.scottlowe.org/2015/12/11/using-ssh-multiplexing/
|
||||
https://github.com/openssh-rust/openssh/blob/master/src/builder.rs
|
||||
https://github.com/openssh-rust/openssh/blob/master/src/process_impl/session.rs
|
||||
https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing
|
||||
https://docs.rs/openssh/0.9.8/openssh/
|
||||
|
||||
For use with teleport you'll need to setup nsh script mentioned here:
|
||||
https://github.com/neondatabase/cloud/wiki/Cloud%3A-access#3-access-the-nodes-with-ssm
|
||||
"""
|
||||
|
||||
|
||||
def show_line(output_label: Optional[str], line: str):
|
||||
if output_label is not None:
|
||||
print(f"({output_label})", line, end="")
|
||||
else:
|
||||
print(" ", line, end="")
|
||||
if not line:
|
||||
print()
|
||||
|
||||
|
||||
async def exec_checked(
|
||||
program: str,
|
||||
args: List[str],
|
||||
err_msg: Optional[str] = None,
|
||||
output_label: Optional[str] = None,
|
||||
show_output: bool = True,
|
||||
expected_exit_codes=frozenset((0,)),
|
||||
) -> List[str]:
|
||||
if show_output:
|
||||
print("+", program, *args)
|
||||
proc = await subprocess.create_subprocess_exec(
|
||||
program,
|
||||
*args,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
limit=10 << 20,
|
||||
)
|
||||
|
||||
assert proc.stdout is not None
|
||||
|
||||
out = []
|
||||
|
||||
line = (await proc.stdout.readline()).decode()
|
||||
if show_output:
|
||||
show_line(output_label, line)
|
||||
|
||||
out.append(line)
|
||||
|
||||
while line:
|
||||
line = (await proc.stdout.readline()).decode()
|
||||
# empty line means eof, actual empty line from the program is represented by "\n"
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if show_output:
|
||||
show_line(output_label, line)
|
||||
out.append(line)
|
||||
exit_code = await proc.wait()
|
||||
assert exit_code in expected_exit_codes, err_msg or f"{program} failed with {exit_code}"
|
||||
return out
|
||||
|
||||
|
||||
class Connection:
|
||||
def __init__(
|
||||
self,
|
||||
tempdir: tempfile.TemporaryDirectory, # type: ignore
|
||||
target: str,
|
||||
):
|
||||
self.tempdir = tempdir
|
||||
self.target = target
|
||||
|
||||
def get_args(self, extra_args: List[str]):
|
||||
ctl_path = os.path.join(self.tempdir.name, "master")
|
||||
return ["-S", ctl_path, "-o", "BatchMode=yes", *extra_args, "none"]
|
||||
|
||||
async def check(self):
|
||||
args = self.get_args(["-O", "check"])
|
||||
await exec_checked("ssh", args, err_msg="master check operation failed")
|
||||
|
||||
async def spawn(self, cmd: str):
|
||||
# https://github.com/openssh-rust/openssh/blob/cd8f174fafc530d8e55c2aa63add14a24cb2b94c/src/process_impl/session.rs#L72
|
||||
local_args = self.get_args(["-T", "-p", "9"])
|
||||
local_args.extend(["--", f"bash -c '{cmd}'"])
|
||||
return await exec_checked(
|
||||
"ssh", local_args, err_msg="spawn failed", output_label=self.target
|
||||
)
|
||||
|
||||
async def close(self):
|
||||
args = self.get_args(["-O", "exit"])
|
||||
await exec_checked("ssh", args, err_msg="master exit operation failed")
|
||||
|
||||
|
||||
async def connect(target: str) -> Connection:
|
||||
"""
|
||||
target is directly passed to ssh command
|
||||
"""
|
||||
# NOTE: it is mentioned that this setup is not secure
|
||||
# For better security it should be placed somewhere in ~/.ssh
|
||||
# or in other directory with proper permissions
|
||||
# openssh-rust does it the same way
|
||||
# https://github.com/openssh-rust/openssh/blob/master/src/builder.rs
|
||||
connection_dir = tempfile.TemporaryDirectory(suffix=".ssh-multiplexed")
|
||||
# "-E logfile"
|
||||
await exec_checked(
|
||||
"ssh",
|
||||
[
|
||||
"-S",
|
||||
os.path.join(connection_dir.name, "master"),
|
||||
"-M", # Places the ssh client into “master” mode for connection sharing.
|
||||
"-f", # Requests ssh to go to background just before command execution.
|
||||
"-N", # Do not execute a remote command. This is useful for just forwarding ports.
|
||||
"-o",
|
||||
"BatchMode=yes",
|
||||
target,
|
||||
],
|
||||
err_msg="starting master process failed",
|
||||
)
|
||||
return Connection(tempdir=connection_dir, target=target)
|
||||
|
||||
|
||||
class Timer:
|
||||
def __init__(self, msg: str) -> None:
|
||||
self.t0 = datetime.now()
|
||||
self.msg = msg
|
||||
|
||||
def __enter__(self):
|
||||
return None
|
||||
|
||||
def __exit__(self, *_):
|
||||
print(self.msg, datetime.now() - self.t0)
|
||||
|
||||
|
||||
def parse_date(s: str) -> date:
|
||||
return datetime.strptime(s, "%Y-%m-%d").date()
|
||||
|
||||
|
||||
def write_line(f, line: str):
|
||||
f.write(line)
|
||||
f.write("\n")
|
||||
|
||||
|
||||
async def pageserver_tenant_sizes(
|
||||
pageserver_target: str, tenants_of_interest: Optional[List[str]] = None
|
||||
) -> Dict[str, int]:
|
||||
"""
|
||||
With ondemand it should rather look at physical size api
|
||||
For old projects since we dont have eviction yet,
|
||||
we can look at local fs state.
|
||||
"""
|
||||
if tenants_of_interest is not None:
|
||||
tenants_of_interest = set(tenants_of_interest) # type: ignore
|
||||
|
||||
ps_connection = await connect(pageserver_target)
|
||||
out = await ps_connection.spawn("du -sb /storage/pageserver/data/tenants/* | sort -rh")
|
||||
|
||||
tenants = {}
|
||||
|
||||
for line in out:
|
||||
if line.startswith("du: cannot read directory"):
|
||||
continue
|
||||
|
||||
size, tenant_path = map(str.strip, line.split())
|
||||
tenant = Path(tenant_path).stem
|
||||
if tenants_of_interest is not None:
|
||||
if tenant not in tenants_of_interest:
|
||||
continue
|
||||
|
||||
tenants[tenant] = int(size)
|
||||
return tenants
|
||||
|
||||
|
||||
async def fetch_ps_size(args):
|
||||
if args.input is not None:
|
||||
tenants = Path(args.input).read_text().splitlines()
|
||||
else:
|
||||
tenants = None
|
||||
|
||||
sizes = await pageserver_tenant_sizes(args.target, tenants_of_interest=tenants)
|
||||
|
||||
total = 0
|
||||
for tenant, size in sorted(sizes.items(), key=lambda x: x[1], reverse=True):
|
||||
total += size
|
||||
print(tenant, size)
|
||||
print("total", total)
|
||||
|
||||
|
||||
@enum.unique
|
||||
class Env(enum.Enum):
|
||||
STAGING = "staging"
|
||||
PRODUCTION = "production"
|
||||
|
||||
|
||||
class ConsoleAdminShortcuts:
|
||||
def __init__(self, env: Env, verbose: bool = False):
|
||||
if env is Env.STAGING:
|
||||
self.admin_base_url = "https://console.neon.tech/api/v1"
|
||||
self.management_base_url = "http://console-staging.local:3440/management/api/v2"
|
||||
elif env is Env.PRODUCTION:
|
||||
self.admin_base_url = "https://console.neon.tech"
|
||||
self.management_base_url = "http://console-release.local:3441/management/api/v2"
|
||||
|
||||
self.api_token = os.getenv("CONSOLE_ADMIN_API_TOKEN")
|
||||
assert self.api_token, '"CONSOLE_ADMIN_API_TOKEN" is missing in env'
|
||||
|
||||
self.verbose = verbose
|
||||
|
||||
async def check_availability(self, project_id: str):
|
||||
url = f"{self.admin_base_url}/admin/projects/{project_id}/check_availability"
|
||||
output = await exec_checked(
|
||||
"curl",
|
||||
[
|
||||
"--silent",
|
||||
"--fail",
|
||||
"-XPOST",
|
||||
url,
|
||||
"-H",
|
||||
f"Authorization: Bearer {self.api_token}",
|
||||
"-H",
|
||||
"Accept: application/json",
|
||||
],
|
||||
show_output=self.verbose,
|
||||
)
|
||||
assert len(output) == 1 # output should be one line of json
|
||||
return json.loads(output.pop())
|
||||
|
||||
async def get_operation(self, operation_id: str):
|
||||
url = f"{self.admin_base_url}/admin/operations/{operation_id}"
|
||||
output = await exec_checked(
|
||||
"curl",
|
||||
[
|
||||
"--silent",
|
||||
"--fail",
|
||||
url,
|
||||
"-H",
|
||||
f"Authorization: Bearer {self.api_token}",
|
||||
"-H",
|
||||
"Accept: application/json",
|
||||
],
|
||||
show_output=self.verbose,
|
||||
)
|
||||
assert len(output) == 1 # output should be one line of json
|
||||
return json.loads(output.pop())
|
||||
|
||||
async def get_pageservers(self):
|
||||
url = f"{self.admin_base_url}/admin/pageservers"
|
||||
output = await exec_checked(
|
||||
"curl",
|
||||
[
|
||||
"--silent",
|
||||
"--fail",
|
||||
url,
|
||||
"-H",
|
||||
f"Authorization: Bearer {self.api_token}",
|
||||
"-H",
|
||||
"Accept: application/json",
|
||||
],
|
||||
show_output=self.verbose,
|
||||
)
|
||||
assert len(output) == 1 # output should be one line of json
|
||||
return json.loads(output.pop())
|
||||
|
||||
async def set_maintenance(self, project_id: str, maintenance: bool) -> Dict[str, Any]:
|
||||
"""
|
||||
Example response:
|
||||
{
|
||||
"project": {
|
||||
"id": "tight-wood-864662",
|
||||
"maintenance_set_at": "2023-01-31T13:36:45.90346Z"
|
||||
},
|
||||
"operations": [
|
||||
{
|
||||
"id": "216142e0-fbb7-4f41-a470-e63408d4d6b4"
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
url = f"{self.management_base_url}/projects/{project_id}/maintenance"
|
||||
data = json.dumps({"maintenance": maintenance})
|
||||
if not self.verbose:
|
||||
args = ["--silent"]
|
||||
else:
|
||||
args = []
|
||||
args.extend(
|
||||
[
|
||||
"--fail",
|
||||
"-XPUT",
|
||||
url,
|
||||
"-H",
|
||||
f"Authorization: Bearer {self.api_token}",
|
||||
"-H",
|
||||
"Accept: application/json",
|
||||
"-d",
|
||||
data,
|
||||
]
|
||||
)
|
||||
output = await exec_checked(
|
||||
"curl",
|
||||
[],
|
||||
show_output=self.verbose,
|
||||
)
|
||||
assert len(output) == 1 # output should be one line of json
|
||||
ret = json.loads(output.pop())
|
||||
assert isinstance(ret, Dict)
|
||||
return ret
|
||||
|
||||
async def fetch_branches(self, project_id: str):
|
||||
url = f"{self.admin_base_url}/admin/branches?project_id={project_id}"
|
||||
output = await exec_checked(
|
||||
"curl",
|
||||
[
|
||||
"--silent",
|
||||
"--fail",
|
||||
url,
|
||||
"-H",
|
||||
f"Authorization: Bearer {self.api_token}",
|
||||
"-H",
|
||||
"Accept: application/json",
|
||||
],
|
||||
show_output=self.verbose,
|
||||
)
|
||||
assert len(output) == 1 # output should be one line of json
|
||||
return json.loads(output.pop())
|
||||
|
||||
|
||||
async def poll_pending_ops(console: ConsoleAdminShortcuts, pending_ops: Set[str]):
|
||||
finished = set() # needed because sets cannot be changed during iteration
|
||||
for pending_op in pending_ops:
|
||||
data = await console.get_operation(pending_op)
|
||||
operation = data["operation"]
|
||||
status = operation["status"]
|
||||
if status == "failed":
|
||||
print(f"ERROR: operation {pending_op} failed")
|
||||
continue
|
||||
|
||||
if operation["failures_count"] != 0:
|
||||
print(f"WARN: operation {pending_op} has failures != 0")
|
||||
continue
|
||||
|
||||
if status == "finished":
|
||||
print(f"operation {pending_op} finished")
|
||||
finished.add(pending_op)
|
||||
else:
|
||||
print(f"operation {pending_op} is still pending: {status}")
|
||||
|
||||
pending_ops.difference_update(finished)
|
||||
|
||||
|
||||
async def check_availability(args):
|
||||
console = ConsoleAdminShortcuts(env=Env(args.env))
|
||||
max_concurrent_checks = args.max_concurrent_checks
|
||||
|
||||
# reverse to keep the order because we will be popping from the end
|
||||
projects: List[str] = list(reversed(Path(args.input).read_text().splitlines()))
|
||||
print("n_projects", len(projects))
|
||||
|
||||
pending_ops: Set[str] = set()
|
||||
while projects:
|
||||
# walk through pending ops
|
||||
if pending_ops:
|
||||
print("pending", len(pending_ops), pending_ops)
|
||||
await poll_pending_ops(console, pending_ops)
|
||||
|
||||
# schedule new ops if limit allows
|
||||
while len(pending_ops) < max_concurrent_checks and len(projects) > 0:
|
||||
project = projects.pop()
|
||||
print("starting:", project, len(projects))
|
||||
# there can be many operations, one for each endpoint
|
||||
data = await console.check_availability(project)
|
||||
for operation in data["operations"]:
|
||||
pending_ops.add(operation["ID"])
|
||||
# wait a bit before starting next one
|
||||
await asyncio.sleep(2)
|
||||
|
||||
if projects:
|
||||
# sleep a little bit to give operations time to finish
|
||||
await asyncio.sleep(5)
|
||||
|
||||
print("all scheduled, poll pending", len(pending_ops), pending_ops, projects)
|
||||
while pending_ops:
|
||||
await poll_pending_ops(console, pending_ops)
|
||||
await asyncio.sleep(5)
|
||||
|
||||
|
||||
async def maintain(args):
|
||||
console = ConsoleAdminShortcuts(env=Env(args.env))
|
||||
finish_flag = args.finish
|
||||
|
||||
projects: List[str] = Path(args.input).read_text().splitlines()
|
||||
print("n_projects", len(projects))
|
||||
|
||||
pending_ops: Set[str] = set()
|
||||
|
||||
for project in projects:
|
||||
data = await console.set_maintenance(project, maintenance=not finish_flag)
|
||||
print(project, len(data["operations"]))
|
||||
for operation in data["operations"]:
|
||||
pending_ops.add(operation["id"])
|
||||
|
||||
if finish_flag:
|
||||
assert len(pending_ops) == 0
|
||||
return
|
||||
|
||||
print("all scheduled, poll pending", len(pending_ops), pending_ops)
|
||||
while pending_ops:
|
||||
await poll_pending_ops(console, pending_ops)
|
||||
print("n pending ops:", len(pending_ops))
|
||||
if pending_ops:
|
||||
await asyncio.sleep(5)
|
||||
|
||||
|
||||
SOURCE_BUCKET = "zenith-storage-oregon"
|
||||
AWS_REGION = "us-west-2"
|
||||
SAFEKEEPER_SOURCE_PREFIX_IN_BUCKET = "prod-1/wal"
|
||||
|
||||
|
||||
async def fetch_sk_s3_size(args):
|
||||
tenants: List[str] = Path(args.input).read_text().splitlines()
|
||||
|
||||
total_objects = 0
|
||||
total_size = 0
|
||||
for tenant in tenants:
|
||||
wal_prefix = f"s3://{SOURCE_BUCKET}/{SAFEKEEPER_SOURCE_PREFIX_IN_BUCKET}/{tenant}"
|
||||
result = await exec_checked(
|
||||
"aws",
|
||||
[
|
||||
"--profile",
|
||||
"neon_main",
|
||||
"s3",
|
||||
"ls",
|
||||
"--recursive",
|
||||
"--summarize",
|
||||
wal_prefix,
|
||||
],
|
||||
expected_exit_codes={0, 1},
|
||||
show_output=False,
|
||||
)
|
||||
objects = int(result[-2].rsplit(maxsplit=1).pop())
|
||||
total_objects += objects
|
||||
|
||||
size = int(result[-1].rsplit(maxsplit=1).pop())
|
||||
total_size += size
|
||||
|
||||
print(tenant, "objects", objects, "size", size)
|
||||
|
||||
print("total_objects", total_objects, "total_size", total_size)
|
||||
|
||||
|
||||
async def fetch_branches(args):
|
||||
console = ConsoleAdminShortcuts(env=Env(args.env))
|
||||
project_id = args.project_id
|
||||
|
||||
pprint.pprint(await console.fetch_branches(project_id=project_id))
|
||||
|
||||
|
||||
async def get_pageservers(args):
|
||||
console = ConsoleAdminShortcuts(env=Env(args.env))
|
||||
|
||||
pprint.pprint(await console.get_pageservers())
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser("migrator")
|
||||
sub = parser.add_subparsers(title="commands", dest="subparser_name")
|
||||
|
||||
split_parser = sub.add_parser(
|
||||
"split",
|
||||
)
|
||||
split_parser.add_argument(
|
||||
"--input",
|
||||
help="CSV file with results from snowflake query mentioned in README.",
|
||||
required=True,
|
||||
)
|
||||
split_parser.add_argument(
|
||||
"--out",
|
||||
help="Directory to store groups of projects. Directory name is pageserver id.",
|
||||
required=True,
|
||||
)
|
||||
split_parser.add_argument(
|
||||
"--last-usage-cutoff",
|
||||
dest="last_usage_cutoff",
|
||||
help="Projects which do not have compute time starting from passed date (e g 2022-12-01) wil be considered not used recently",
|
||||
required=True,
|
||||
)
|
||||
split_parser.add_argument(
|
||||
"--select-pageserver-id",
|
||||
help="Filter input for this pageserver id",
|
||||
required=True,
|
||||
)
|
||||
|
||||
fetch_ps_size_parser = sub.add_parser("fetch-ps-size")
|
||||
fetch_ps_size_parser.add_argument(
|
||||
"--target",
|
||||
help="Target pageserver host as resolvable by ssh",
|
||||
required=True,
|
||||
)
|
||||
fetch_ps_size_parser.add_argument(
|
||||
"--input",
|
||||
help="File containing list of tenants to include",
|
||||
)
|
||||
|
||||
check_availability_parser = sub.add_parser("check-availability")
|
||||
check_availability_parser.add_argument(
|
||||
"--input",
|
||||
help="File containing list of projects to run availability checks for",
|
||||
)
|
||||
check_availability_parser.add_argument(
|
||||
"--env", choices=["staging", "production"], default="staging"
|
||||
)
|
||||
check_availability_parser.add_argument(
|
||||
"--max-concurrent-checks",
|
||||
help="Max number of simultaneously active availability checks",
|
||||
type=int,
|
||||
default=50,
|
||||
)
|
||||
|
||||
maintain_parser = sub.add_parser("maintain")
|
||||
maintain_parser.add_argument(
|
||||
"--input",
|
||||
help="File containing list of projects",
|
||||
)
|
||||
maintain_parser.add_argument("--env", choices=["staging", "production"], default="staging")
|
||||
maintain_parser.add_argument(
|
||||
"--finish",
|
||||
action="store_true",
|
||||
)
|
||||
|
||||
fetch_sk_s3_size_parser = sub.add_parser("fetch-sk-s3-size")
|
||||
fetch_sk_s3_size_parser.add_argument(
|
||||
"--input",
|
||||
help="File containing list of tenants",
|
||||
)
|
||||
|
||||
fetch_branches_parser = sub.add_parser("fetch-branches")
|
||||
fetch_branches_parser.add_argument("--project-id")
|
||||
fetch_branches_parser.add_argument(
|
||||
"--env", choices=["staging", "production"], default="staging"
|
||||
)
|
||||
|
||||
get_pageservers_parser = sub.add_parser("get-pageservers")
|
||||
get_pageservers_parser.add_argument(
|
||||
"--env", choices=["staging", "production"], default="staging"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
handlers = {
|
||||
"fetch-ps-size": fetch_ps_size,
|
||||
"check-availability": check_availability,
|
||||
"maintain": maintain,
|
||||
"fetch-sk-s3-size": fetch_sk_s3_size,
|
||||
"fetch-branches": fetch_branches,
|
||||
"get-pageservers": get_pageservers,
|
||||
}
|
||||
|
||||
handler = handlers.get(args.subparser_name)
|
||||
if handler:
|
||||
await handler(args)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user