diff --git a/Cargo.lock b/Cargo.lock index 40f4358d98..c770f576c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,6 +75,27 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-stream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.52" @@ -703,6 +724,21 @@ dependencies = [ "termcolor", ] +[[package]] +name = "etcd-client" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "585de5039d1ecce74773db49ba4e8107e42be7c2cd0b1a9e7fce27181db7b118" +dependencies = [ + "http", + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tower-service", +] + [[package]] name = "fail" version = "0.5.0" @@ -741,6 +777,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "fixedbitset" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e" + [[package]] name = "fnv" version = "1.0.7" @@ -926,7 +968,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util", + "tokio-util 0.6.9", "tracing", ] @@ -954,6 +996,15 @@ dependencies = [ "ahash 0.7.6", ] +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1075,6 +1126,18 @@ dependencies = [ "tokio-rustls 0.23.2", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -1308,9 +1371,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.7.14" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" +checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2" dependencies = [ "libc", "log", @@ -1328,6 +1391,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "nix" version = "0.23.1" @@ -1557,6 +1626,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "petgraph" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a13a2fa9d0b63e5f22328828741e523766fff0ee9e779316902290dff3f824f" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "phf" version = "0.8.0" @@ -1776,6 +1855,59 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prost" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" +dependencies = [ + "bytes", + "heck", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost", + "prost-types", + "regex", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a" +dependencies = [ + "bytes", + "prost", +] + [[package]] name = "proxy" version = "0.1.0" @@ -1979,7 +2111,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-rustls 0.23.2", - "tokio-util", + "tokio-util 0.6.9", "url", "wasm-bindgen", "wasm-bindgen-futures", @@ -2508,9 +2640,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.16.1" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a" +checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" dependencies = [ "bytes", "libc", @@ -2520,10 +2652,21 @@ dependencies = [ "once_cell", "pin-project-lite", "signal-hook-registry", + "socket2", "tokio-macros", "winapi", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "1.7.0" @@ -2554,7 +2697,7 @@ dependencies = [ "postgres-types 0.2.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", "socket2", "tokio", - "tokio-util", + "tokio-util 0.6.9", ] [[package]] @@ -2576,7 +2719,7 @@ dependencies = [ "postgres-types 0.2.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)", "socket2", "tokio", - "tokio-util", + "tokio-util 0.6.9", ] [[package]] @@ -2641,6 +2784,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml" version = "0.5.8" @@ -2663,6 +2820,75 @@ dependencies = [ "serde", ] +[[package]] +name = "tonic" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff08f4649d10a70ffa3522ca559031285d8e421d727ac85c60825761818f5d0a" +dependencies = [ + "async-stream", + "async-trait", + "base64 0.13.0", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util 0.6.9", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tonic-build" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9403f1bafde247186684b230dc6f38b5cd514584e8bec1dd32514be4745fa757" +dependencies = [ + "proc-macro2", + "prost-build", + "quote", + "syn", +] + +[[package]] +name = "tower" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e" +dependencies = [ + "futures-core", + "futures-util", + "indexmap", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util 0.7.0", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + [[package]] name = "tower-service" version = "0.3.1" @@ -2676,6 +2902,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d8d93354fe2a8e50d5953f5ae2e47a3fc2ef03292e7ea46e3cc38f549525fb9" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2768,6 +2995,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99" + [[package]] name = "unicode-width" version = "0.1.9" @@ -2838,6 +3071,7 @@ dependencies = [ "const_format", "crc32c", "daemonize", + "etcd-client", "fs2", "hex", "humantime", @@ -2850,11 +3084,13 @@ dependencies = [ "rust-s3", "serde", "serde_json", + "serde_with", "signal-hook", "tempfile", "tokio", "tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", "tracing", + "url", "walkdir", "workspace_hack", "zenith_metrics", diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 00ace431e6..2bdc76e876 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -57,6 +57,10 @@ pub struct LocalEnv { #[serde(default)] pub private_key_path: PathBuf, + // A comma separated broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'. + #[serde(default)] + pub broker_endpoints: Option, + pub pageserver: PageServerConf, #[serde(default)] diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 969e2cd531..89ab0a31ee 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -73,6 +73,8 @@ pub struct SafekeeperNode { pub http_base_url: String, pub pageserver: Arc, + + broker_endpoints: Option, } impl SafekeeperNode { @@ -89,6 +91,7 @@ impl SafekeeperNode { http_client: Client::new(), http_base_url: format!("http://127.0.0.1:{}/v1", conf.http_port), pageserver, + broker_endpoints: env.broker_endpoints.clone(), } } @@ -135,6 +138,9 @@ impl SafekeeperNode { if !self.conf.sync { cmd.arg("--no-sync"); } + if let Some(ref ep) = self.broker_endpoints { + cmd.args(&["--broker-endpoints", ep]); + } if !cmd.status()?.success() { bail!( diff --git a/test_runner/README.md b/test_runner/README.md index a56c2df2c0..ee171ae6a0 100644 --- a/test_runner/README.md +++ b/test_runner/README.md @@ -10,6 +10,8 @@ Prerequisites: below to run from other directories. - The zenith git repo, including the postgres submodule (for some tests, e.g. `pg_regress`) +- Some tests (involving storage nodes coordination) require etcd installed. Follow + [`the guide`](https://etcd.io/docs/v3.5/install/) to obtain it. ### Test Organization diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 37ce1a8bca..bdc526a125 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -13,7 +13,7 @@ from dataclasses import dataclass, field from multiprocessing import Process, Value from pathlib import Path from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol -from fixtures.utils import lsn_to_hex, mkdir_if_needed, lsn_from_hex +from fixtures.utils import etcd_path, lsn_to_hex, mkdir_if_needed, lsn_from_hex from fixtures.log_helper import log from typing import List, Optional, Any @@ -22,6 +22,7 @@ from typing import List, Optional, Any # succeed and data is written def test_normal_work(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 + zenith_env_builder.broker = True env = zenith_env_builder.init_start() env.zenith_cli.create_branch('test_wal_acceptors_normal_work') @@ -326,6 +327,49 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value): proc.join() +# Test that safekeepers push their info to the broker and learn peer status from it +@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH") +def test_broker(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.num_safekeepers = 3 + zenith_env_builder.broker = True + zenith_env_builder.enable_local_fs_remote_storage() + env = zenith_env_builder.init_start() + + env.zenith_cli.create_branch("test_broker", "main") + pg = env.postgres.create_start('test_broker') + pg.safe_psql("CREATE TABLE t(key int primary key, value text)") + + # learn zenith timeline from compute + tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0] + timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0] + + # wait until remote_consistent_lsn gets advanced on all safekeepers + clients = [sk.http_client() for sk in env.safekeepers] + stat_before = [cli.timeline_status(tenant_id, timeline_id) for cli in clients] + log.info(f"statuses is {stat_before}") + + pg.safe_psql("INSERT INTO t SELECT generate_series(1,100), 'payload'") + # force checkpoint to advance remote_consistent_lsn + with closing(env.pageserver.connect()) as psconn: + with psconn.cursor() as pscur: + pscur.execute(f"checkpoint {tenant_id} {timeline_id}") + # and wait till remote_consistent_lsn propagates to all safekeepers + started_at = time.time() + while True: + stat_after = [cli.timeline_status(tenant_id, timeline_id) for cli in clients] + if all( + lsn_from_hex(s_after.remote_consistent_lsn) > lsn_from_hex( + s_before.remote_consistent_lsn) for s_after, + s_before in zip(stat_after, stat_before)): + break + elapsed = time.time() - started_at + if elapsed > 20: + raise RuntimeError( + f"timed out waiting {elapsed:.0f}s for remote_consistent_lsn propagation: status before {stat_before}, status current {stat_after}" + ) + time.sleep(0.5) + + class ProposerPostgres(PgProtocol): """Object for running postgres without ZenithEnv""" def __init__(self, diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 58f7294eb5..f16fe1d9cf 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -1,4 +1,5 @@ import os +import shutil import subprocess from typing import Any, List @@ -76,3 +77,8 @@ def print_gc_result(row): log.info( " total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}" .format_map(row)) + + +# path to etcd binary or None if not present. +def etcd_path(): + return shutil.which("etcd") diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 2da021a49c..a95809687a 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -33,7 +33,7 @@ from typing_extensions import Literal import requests import backoff # type: ignore -from .utils import (get_self_dir, lsn_from_hex, mkdir_if_needed, subprocess_capture) +from .utils import (etcd_path, get_self_dir, mkdir_if_needed, subprocess_capture, lsn_from_hex) from fixtures.log_helper import log """ This file contains pytest fixtures. A fixture is a test resource that can be @@ -433,7 +433,8 @@ class ZenithEnvBuilder: num_safekeepers: int = 0, pageserver_auth_enabled: bool = False, rust_log_override: Optional[str] = None, - default_branch_name=DEFAULT_BRANCH_NAME): + default_branch_name=DEFAULT_BRANCH_NAME, + broker: bool = False): self.repo_dir = repo_dir self.rust_log_override = rust_log_override self.port_distributor = port_distributor @@ -442,6 +443,7 @@ class ZenithEnvBuilder: self.num_safekeepers = num_safekeepers self.pageserver_auth_enabled = pageserver_auth_enabled self.default_branch_name = default_branch_name + self.broker = broker self.env: Optional[ZenithEnv] = None self.s3_mock_server: Optional[MockS3Server] = None @@ -517,6 +519,8 @@ class ZenithEnvBuilder: self.env.pageserver.stop(immediate=True) if self.s3_mock_server: self.s3_mock_server.kill() + if self.env.broker is not None: + self.env.broker.stop() class ZenithEnv: @@ -569,6 +573,16 @@ class ZenithEnv: default_tenant_id = '{self.initial_tenant.hex}' """) + self.broker = None + if config.broker: + # keep etcd datadir inside 'repo' + self.broker = Etcd(datadir=os.path.join(self.repo_dir, "etcd"), + port=self.port_distributor.get_port(), + peer_port=self.port_distributor.get_port()) + toml += textwrap.dedent(f""" + broker_endpoints = 'http://127.0.0.1:{self.broker.port}' + """) + # Create config for pageserver pageserver_port = PageserverPort( pg=self.port_distributor.get_port(), @@ -611,12 +625,15 @@ class ZenithEnv: self.zenith_cli.init(toml) def start(self): - # Start up the page server and all the safekeepers + # Start up the page server, all the safekeepers and the broker self.pageserver.start() for safekeeper in self.safekeepers: safekeeper.start() + if self.broker is not None: + self.broker.start() + def get_safekeeper_connstrs(self) -> str: """ Get list of safekeeper endpoints suitable for wal_acceptors GUC """ return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers]) @@ -1674,6 +1691,7 @@ class Safekeeper: class SafekeeperTimelineStatus: acceptor_epoch: int flush_lsn: str + remote_consistent_lsn: str @dataclass @@ -1697,7 +1715,8 @@ class SafekeeperHttpClient(requests.Session): res.raise_for_status() resj = res.json() return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'], - flush_lsn=resj['flush_lsn']) + flush_lsn=resj['flush_lsn'], + remote_consistent_lsn=resj['remote_consistent_lsn']) def get_metrics(self) -> SafekeeperMetrics: request_result = self.get(f"http://localhost:{self.port}/metrics") @@ -1718,6 +1737,54 @@ class SafekeeperHttpClient(requests.Session): return metrics +@dataclass +class Etcd: + """ An object managing etcd instance """ + datadir: str + port: int + peer_port: int + handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon + + def check_status(self): + s = requests.Session() + s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry + s.get(f"http://localhost:{self.port}/health").raise_for_status() + + def start(self): + pathlib.Path(self.datadir).mkdir(exist_ok=True) + etcd_full_path = etcd_path() + if etcd_full_path is None: + raise Exception('etcd not found') + + with open(os.path.join(self.datadir, "etcd.log"), "wb") as log_file: + args = [ + etcd_full_path, + f"--data-dir={self.datadir}", + f"--listen-client-urls=http://localhost:{self.port}", + f"--advertise-client-urls=http://localhost:{self.port}", + f"--listen-peer-urls=http://localhost:{self.peer_port}" + ] + self.handle = subprocess.Popen(args, stdout=log_file, stderr=log_file) + + # wait for start + started_at = time.time() + while True: + try: + self.check_status() + except Exception as e: + elapsed = time.time() - started_at + if elapsed > 5: + raise RuntimeError(f"timed out waiting {elapsed:.0f}s for etcd start: {e}") + time.sleep(0.5) + else: + break # success + + def stop(self): + if self.handle is not None: + self.handle.terminate() + self.handle.wait() + + def get_test_output_dir(request: Any) -> str: """ Compute the working directory for an individual test. """ test_name = request.node.name diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index f59c24816d..e8523d27d1 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -22,11 +22,14 @@ anyhow = "1.0" crc32c = "0.6.0" humantime = "2.1.0" walkdir = "2" +url = "2.2.2" signal-hook = "0.3.10" serde = { version = "1.0", features = ["derive"] } +serde_with = {version = "1.12.0"} hex = "0.4.3" const_format = "0.2.21" tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } +etcd-client = "0.8.3" postgres_ffi = { path = "../postgres_ffi" } zenith_metrics = { path = "../zenith_metrics" } diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index 6c45115e5f..b3087a1004 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -11,18 +11,19 @@ use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::thread; use tracing::*; +use url::{ParseError, Url}; use walkeeper::control_file::{self}; use zenith_utils::http::endpoint; use zenith_utils::zid::ZNodeId; use zenith_utils::{logging, tcp_listener, GIT_VERSION}; use tokio::sync::mpsc; -use walkeeper::callmemaybe; use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; use walkeeper::http; use walkeeper::s3_offload; use walkeeper::wal_service; use walkeeper::SafeKeeperConf; +use walkeeper::{broker, callmemaybe}; use zenith_utils::shutdown::exit_now; use zenith_utils::signals; @@ -104,6 +105,11 @@ fn main() -> Result<()> { ) .arg( Arg::new("id").long("id").takes_value(true).help("safekeeper node id: integer") + ).arg( + Arg::new("broker-endpoints") + .long("broker-endpoints") + .takes_value(true) + .help("a comma separated broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'"), ) .get_matches(); @@ -154,6 +160,11 @@ fn main() -> Result<()> { )); } + if let Some(addr) = arg_matches.value_of("broker-endpoints") { + let collected_ep: Result, ParseError> = addr.split(',').map(Url::parse).collect(); + conf.broker_endpoints = Some(collected_ep?); + } + start_safekeeper(conf, given_id, arg_matches.is_present("init")) } @@ -259,11 +270,12 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: b threads.push(wal_acceptor_thread); + let conf_cloned = conf.clone(); let callmemaybe_thread = thread::Builder::new() .name("callmemaybe thread".into()) .spawn(|| { // thread code - let thread_result = callmemaybe::thread_main(conf, rx); + let thread_result = callmemaybe::thread_main(conf_cloned, rx); if let Err(e) = thread_result { error!("callmemaybe thread terminated: {}", e); } @@ -271,6 +283,17 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: b .unwrap(); threads.push(callmemaybe_thread); + if conf.broker_endpoints.is_some() { + let conf_ = conf.clone(); + threads.push( + thread::Builder::new() + .name("broker thread".into()) + .spawn(|| { + broker::thread_main(conf_); + })?, + ); + } + // TODO: put more thoughts into handling of failed threads // We probably should restart them. diff --git a/walkeeper/src/broker.rs b/walkeeper/src/broker.rs new file mode 100644 index 0000000000..147497d673 --- /dev/null +++ b/walkeeper/src/broker.rs @@ -0,0 +1,211 @@ +//! Communication with etcd, providing safekeeper peers and pageserver coordination. + +use anyhow::bail; +use anyhow::Context; +use anyhow::Error; +use anyhow::Result; +use etcd_client::Client; +use etcd_client::EventType; +use etcd_client::PutOptions; +use etcd_client::WatchOptions; +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; +use std::str::FromStr; +use std::time::Duration; +use tokio::task::JoinHandle; +use tokio::{runtime, time::sleep}; +use tracing::*; +use zenith_utils::zid::ZTenantId; +use zenith_utils::zid::ZTimelineId; +use zenith_utils::{ + lsn::Lsn, + zid::{ZNodeId, ZTenantTimelineId}, +}; + +use crate::{safekeeper::Term, timeline::GlobalTimelines, SafeKeeperConf}; + +const RETRY_INTERVAL_MSEC: u64 = 1000; +const PUSH_INTERVAL_MSEC: u64 = 1000; +const LEASE_TTL_SEC: i64 = 5; +// TODO: add global zenith installation ID. +const ZENITH_PREFIX: &str = "zenith"; + +/// Published data about safekeeper. Fields made optional for easy migrations. +#[serde_as] +#[derive(Deserialize, Serialize)] +pub struct SafekeeperInfo { + /// Term of the last entry. + pub last_log_term: Option, + /// LSN of the last record. + #[serde_as(as = "Option")] + pub flush_lsn: Option, + /// Up to which LSN safekeeper regards its WAL as committed. + #[serde_as(as = "Option")] + pub commit_lsn: Option, + /// LSN up to which safekeeper offloaded WAL to s3. + #[serde_as(as = "Option")] + pub s3_wal_lsn: Option, + /// LSN of last checkpoint uploaded by pageserver. + #[serde_as(as = "Option")] + pub remote_consistent_lsn: Option, + #[serde_as(as = "Option")] + pub peer_horizon_lsn: Option, +} + +pub fn thread_main(conf: SafeKeeperConf) { + let runtime = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let _enter = info_span!("broker").entered(); + info!("started, broker endpoints {:?}", conf.broker_endpoints); + + runtime.block_on(async { + main_loop(conf).await; + }); +} + +/// Prefix to timeline related data. +fn timeline_path(zttid: &ZTenantTimelineId) -> String { + format!( + "{}/{}/{}", + ZENITH_PREFIX, zttid.tenant_id, zttid.timeline_id + ) +} + +/// Key to per timeline per safekeeper data. +fn timeline_safekeeper_path(zttid: &ZTenantTimelineId, sk_id: ZNodeId) -> String { + format!("{}/safekeeper/{}", timeline_path(zttid), sk_id) +} + +/// Push once in a while data about all active timelines to the broker. +async fn push_loop(conf: SafeKeeperConf) -> Result<()> { + let mut client = Client::connect(conf.broker_endpoints.as_ref().unwrap(), None).await?; + + // Get and maintain lease to automatically delete obsolete data + let lease = client.lease_grant(LEASE_TTL_SEC, None).await?; + let (mut keeper, mut ka_stream) = client.lease_keep_alive(lease.id()).await?; + + let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); + loop { + // Note: we lock runtime here and in timeline methods as GlobalTimelines + // is under plain mutex. That's ok, all this code is not performance + // sensitive and there is no risk of deadlock as we don't await while + // lock is held. + let active_tlis = GlobalTimelines::get_active_timelines(); + for zttid in &active_tlis { + if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) { + let sk_info = tli.get_public_info(); + let put_opts = PutOptions::new().with_lease(lease.id()); + client + .put( + timeline_safekeeper_path(zttid, conf.my_id), + serde_json::to_string(&sk_info)?, + Some(put_opts), + ) + .await + .context("failed to push safekeeper info")?; + } + } + // revive the lease + keeper + .keep_alive() + .await + .context("failed to send LeaseKeepAliveRequest")?; + ka_stream + .message() + .await + .context("failed to receive LeaseKeepAliveResponse")?; + sleep(push_interval).await; + } +} + +/// Subscribe and fetch all the interesting data from the broker. +async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { + lazy_static! { + static ref TIMELINE_SAFEKEEPER_RE: Regex = + Regex::new(r"^zenith/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]])$") + .unwrap(); + } + let mut client = Client::connect(conf.broker_endpoints.as_ref().unwrap(), None).await?; + loop { + let wo = WatchOptions::new().with_prefix(); + // TODO: subscribe only to my timelines + let (_, mut stream) = client.watch(ZENITH_PREFIX, Some(wo)).await?; + while let Some(resp) = stream.message().await? { + if resp.canceled() { + bail!("watch canceled"); + } + + for event in resp.events() { + if EventType::Put == event.event_type() { + if let Some(kv) = event.kv() { + if let Some(caps) = TIMELINE_SAFEKEEPER_RE.captures(kv.key_str()?) { + let tenant_id = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timeline_id = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + let zttid = ZTenantTimelineId::new(tenant_id, timeline_id); + let safekeeper_id = ZNodeId(caps.get(3).unwrap().as_str().parse()?); + let value_str = kv.value_str()?; + match serde_json::from_str::(value_str) { + Ok(safekeeper_info) => { + if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) { + tli.record_safekeeper_info(&safekeeper_info, safekeeper_id)? + } + } + Err(err) => warn!( + "failed to deserialize safekeeper info {}: {}", + value_str, err + ), + } + } + } + } + } + } + } +} + +async fn main_loop(conf: SafeKeeperConf) { + let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC)); + let mut push_handle: Option>> = None; + let mut pull_handle: Option>> = None; + // Selecting on JoinHandles requires some squats; is there a better way to + // reap tasks individually? + + // Handling failures in task itself won't catch panic and in Tokio, task's + // panic doesn't kill the whole executor, so it is better to do reaping + // here. + loop { + tokio::select! { + res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => { + // was it panic or normal error? + let err = match res { + Ok(res_internal) => res_internal.unwrap_err(), + Err(err_outer) => err_outer.into(), + }; + warn!("push task failed: {:?}", err); + push_handle = None; + }, + res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => { + // was it panic or normal error? + let err = match res { + Ok(res_internal) => res_internal.unwrap_err(), + Err(err_outer) => err_outer.into(), + }; + warn!("pull task failed: {:?}", err); + pull_handle = None; + }, + _ = ticker.tick() => { + if push_handle.is_none() { + push_handle = Some(tokio::spawn(push_loop(conf.clone()))); + } + if pull_handle.is_none() { + pull_handle = Some(tokio::spawn(pull_loop(conf.clone()))); + } + } + } + } +} diff --git a/walkeeper/src/handler.rs b/walkeeper/src/handler.rs index ead6fab9fb..00d177da56 100644 --- a/walkeeper/src/handler.rs +++ b/walkeeper/src/handler.rs @@ -168,7 +168,14 @@ impl SafekeeperPostgresHandler { fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> { let start_pos = self.timeline.get().get_end_of_wal(); let lsn = start_pos.to_string(); - let sysid = self.timeline.get().get_info().server.system_id.to_string(); + let sysid = self + .timeline + .get() + .get_state() + .1 + .server + .system_id + .to_string(); let lsn_bytes = lsn.as_bytes(); let tli = PG_TLI.to_string(); let tli_bytes = tli.as_bytes(); diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index 74f7f4a735..06a0682c37 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -86,23 +86,24 @@ async fn timeline_status_handler(request: Request) -> Result Result<()> { fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: Lsn) -> Result<()> { // add new term to existing history - let history = spg.timeline.get().get_info().acceptor_state.term_history; + let history = spg.timeline.get().get_state().1.acceptor_state.term_history; let history = history.up_to(lsn.checked_sub(1u64).unwrap()); let mut history_entries = history.0; history_entries.push(TermSwitchEntry { term, lsn }); @@ -142,7 +142,7 @@ fn append_logical_message( msg: &AppendLogicalMessage, ) -> Result { let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message); - let sk_state = spg.timeline.get().get_info(); + let sk_state = spg.timeline.get().get_state().1; let begin_lsn = msg.begin_lsn; let end_lsn = begin_lsn + wal_data.len() as u64; diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index dfd71e4de2..69423d42d8 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -1,9 +1,11 @@ // use std::path::PathBuf; use std::time::Duration; +use url::Url; use zenith_utils::zid::{ZNodeId, ZTenantTimelineId}; +pub mod broker; pub mod callmemaybe; pub mod control_file; pub mod control_file_upgrade; @@ -47,6 +49,7 @@ pub struct SafeKeeperConf { pub ttl: Option, pub recall_period: Duration, pub my_id: ZNodeId, + pub broker_endpoints: Option>, } impl SafeKeeperConf { @@ -71,6 +74,7 @@ impl Default for SafeKeeperConf { ttl: None, recall_period: defaults::DEFAULT_RECALL_PERIOD, my_id: ZNodeId(0), + broker_endpoints: None, } } } diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 8300b32b42..307a67e5f3 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -193,7 +193,7 @@ pub struct SafeKeeperState { pub peer_horizon_lsn: Lsn, /// LSN of the oldest known checkpoint made by pageserver and successfully /// pushed to s3. We don't remove WAL beyond it. Persisted only for - /// informational purposes, we receive it from pageserver. + /// informational purposes, we receive it from pageserver (or broker). pub remote_consistent_lsn: Lsn, // Peers and their state as we remember it. Knowing peers themselves is // fundamental; but state is saved here only for informational purposes and @@ -203,11 +203,13 @@ pub struct SafeKeeperState { } #[derive(Debug, Clone)] -// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; they are -// not flushed yet. +// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values +// are not flushed yet. pub struct SafekeeperMemState { pub commit_lsn: Lsn, + pub s3_wal_lsn: Lsn, // TODO: keep only persistent version pub peer_horizon_lsn: Lsn, + pub remote_consistent_lsn: Lsn, } impl SafeKeeperState { @@ -494,14 +496,13 @@ pub struct SafeKeeper { metrics: SafeKeeperMetrics, /// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn. - global_commit_lsn: Lsn, + pub global_commit_lsn: Lsn, /// LSN since the proposer safekeeper currently talking to appends WAL; /// determines epoch switch point. epoch_start_lsn: Lsn, pub inmem: SafekeeperMemState, // in memory part - - pub s: SafeKeeperState, // persistent part + pub s: SafeKeeperState, // persistent part pub control_store: CTRL, pub wal_store: WAL, @@ -529,7 +530,9 @@ where epoch_start_lsn: Lsn(0), inmem: SafekeeperMemState { commit_lsn: state.commit_lsn, + s3_wal_lsn: state.s3_wal_lsn, peer_horizon_lsn: state.peer_horizon_lsn, + remote_consistent_lsn: state.remote_consistent_lsn, }, s: state, control_store, @@ -545,8 +548,7 @@ where .up_to(self.wal_store.flush_lsn()) } - #[cfg(test)] - fn get_epoch(&self) -> Term { + pub fn get_epoch(&self) -> Term { self.s.acceptor_state.get_epoch(self.wal_store.flush_lsn()) } @@ -697,7 +699,7 @@ where } /// Advance commit_lsn taking into account what we have locally - fn update_commit_lsn(&mut self) -> Result<()> { + pub fn update_commit_lsn(&mut self) -> Result<()> { let commit_lsn = min(self.global_commit_lsn, self.wal_store.flush_lsn()); assert!(commit_lsn >= self.inmem.commit_lsn); diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index 1febd71842..f12fb5cb4a 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -230,7 +230,7 @@ impl ReplicationConn { let mut wal_seg_size: usize; loop { - wal_seg_size = spg.timeline.get().get_info().server.wal_seg_size as usize; + wal_seg_size = spg.timeline.get().get_state().1.server.wal_seg_size as usize; if wal_seg_size == 0 { error!("Cannot start replication before connecting to wal_proposer"); sleep(Duration::from_secs(1)); diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index b53f2e086b..b10ab97cc1 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -17,12 +17,14 @@ use tracing::*; use zenith_utils::lsn::Lsn; use zenith_utils::zid::{ZNodeId, ZTenantTimelineId}; +use crate::broker::SafekeeperInfo; use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; use crate::control_file; use crate::control_file::Storage as cf_storage; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, + SafekeeperMemState, }; use crate::send_wal::HotStandbyFeedback; use crate::wal_storage; @@ -349,6 +351,11 @@ impl Timeline { Ok(false) } + fn is_active(&self) -> bool { + let shared_state = self.mutex.lock().unwrap(); + shared_state.active + } + /// Timed wait for an LSN to be committed. /// /// Returns the last committed LSN, which will be at least @@ -410,8 +417,61 @@ impl Timeline { Ok(rmsg) } - pub fn get_info(&self) -> SafeKeeperState { - self.mutex.lock().unwrap().sk.s.clone() + pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) { + let shared_state = self.mutex.lock().unwrap(); + (shared_state.sk.inmem.clone(), shared_state.sk.s.clone()) + } + + /// Prepare public safekeeper info for reporting. + pub fn get_public_info(&self) -> SafekeeperInfo { + let shared_state = self.mutex.lock().unwrap(); + SafekeeperInfo { + last_log_term: Some(shared_state.sk.get_epoch()), + flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()), + // note: this value is not flushed to control file yet and can be lost + commit_lsn: Some(shared_state.sk.inmem.commit_lsn), + s3_wal_lsn: Some(shared_state.sk.inmem.s3_wal_lsn), + // TODO: rework feedbacks to avoid max here + remote_consistent_lsn: Some(max( + shared_state.get_replicas_state().remote_consistent_lsn, + shared_state.sk.inmem.remote_consistent_lsn, + )), + peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), + } + } + + /// Update timeline state with peer safekeeper data. + pub fn record_safekeeper_info(&self, sk_info: &SafekeeperInfo, _sk_id: ZNodeId) -> Result<()> { + let mut shared_state = self.mutex.lock().unwrap(); + // Note: the check is too restrictive, generally we can update local + // commit_lsn if our history matches (is part of) history of advanced + // commit_lsn provider. + if let (Some(commit_lsn), Some(last_log_term)) = (sk_info.commit_lsn, sk_info.last_log_term) + { + if last_log_term == shared_state.sk.get_epoch() { + shared_state.sk.global_commit_lsn = + max(commit_lsn, shared_state.sk.global_commit_lsn); + shared_state.sk.update_commit_lsn()?; + let local_commit_lsn = min(commit_lsn, shared_state.sk.wal_store.flush_lsn()); + shared_state.sk.inmem.commit_lsn = + max(local_commit_lsn, shared_state.sk.inmem.commit_lsn); + } + } + if let Some(s3_wal_lsn) = sk_info.s3_wal_lsn { + shared_state.sk.inmem.s3_wal_lsn = max(s3_wal_lsn, shared_state.sk.inmem.s3_wal_lsn); + } + if let Some(remote_consistent_lsn) = sk_info.remote_consistent_lsn { + shared_state.sk.inmem.remote_consistent_lsn = max( + remote_consistent_lsn, + shared_state.sk.inmem.remote_consistent_lsn, + ); + } + if let Some(peer_horizon_lsn) = sk_info.peer_horizon_lsn { + shared_state.sk.inmem.peer_horizon_lsn = + max(peer_horizon_lsn, shared_state.sk.inmem.peer_horizon_lsn); + } + // TODO: sync control file + Ok(()) } pub fn add_replica(&self, state: ReplicaState) -> usize { @@ -495,7 +555,7 @@ impl GlobalTimelines { } /// Get a timeline with control file loaded from the global TIMELINES map. - /// If control file doesn't exist, bails out. + /// If control file doesn't exist and create=false, bails out. pub fn get( conf: &SafeKeeperConf, zttid: ZTenantTimelineId, @@ -537,4 +597,14 @@ impl GlobalTimelines { } } } + + /// Get ZTenantTimelineIDs of all active timelines. + pub fn get_active_timelines() -> Vec { + let timelines = TIMELINES.lock().unwrap(); + timelines + .iter() + .filter(|&(_, tli)| tli.is_active()) + .map(|(zttid, _)| *zttid) + .collect() + } }