Add safekeeper information exchange through etcd.

Safekeers now publish to and pull from etcd per-timeline data. Immediate goal is
WAL truncation, for which every safekeeper must know remote_consistent_lsn; the
next would be callmemaybe replacement.

Adds corresponding '--broker' argument to safekeeper and ability to run etcd in
tests.

Adds test checking remote_consistent_lsn is indeed communicated.
This commit is contained in:
Arseny Sher
2022-03-17 15:14:16 +03:00
parent 9594362f74
commit ec3bc74165
17 changed files with 726 additions and 40 deletions

252
Cargo.lock generated
View File

@@ -75,6 +75,27 @@ dependencies = [
"zstd-safe",
]
[[package]]
name = "async-stream"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625"
dependencies = [
"async-stream-impl",
"futures-core",
]
[[package]]
name = "async-stream-impl"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-trait"
version = "0.1.52"
@@ -703,6 +724,21 @@ dependencies = [
"termcolor",
]
[[package]]
name = "etcd-client"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "585de5039d1ecce74773db49ba4e8107e42be7c2cd0b1a9e7fce27181db7b118"
dependencies = [
"http",
"prost",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
"tower-service",
]
[[package]]
name = "fail"
version = "0.5.0"
@@ -741,6 +777,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "fixedbitset"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"
[[package]]
name = "fnv"
version = "1.0.7"
@@ -926,7 +968,7 @@ dependencies = [
"indexmap",
"slab",
"tokio",
"tokio-util",
"tokio-util 0.6.9",
"tracing",
]
@@ -954,6 +996,15 @@ dependencies = [
"ahash 0.7.6",
]
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.19"
@@ -1075,6 +1126,18 @@ dependencies = [
"tokio-rustls 0.23.2",
]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
"hyper",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
]
[[package]]
name = "ident_case"
version = "1.0.1"
@@ -1308,9 +1371,9 @@ dependencies = [
[[package]]
name = "mio"
version = "0.7.14"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc"
checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2"
dependencies = [
"libc",
"log",
@@ -1328,6 +1391,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "multimap"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "nix"
version = "0.23.1"
@@ -1557,6 +1626,16 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "petgraph"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a13a2fa9d0b63e5f22328828741e523766fff0ee9e779316902290dff3f824f"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]]
name = "phf"
version = "0.8.0"
@@ -1776,6 +1855,59 @@ dependencies = [
"thiserror",
]
[[package]]
name = "prost"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
dependencies = [
"bytes",
"heck",
"itertools",
"lazy_static",
"log",
"multimap",
"petgraph",
"prost",
"prost-types",
"regex",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a"
dependencies = [
"bytes",
"prost",
]
[[package]]
name = "proxy"
version = "0.1.0"
@@ -1979,7 +2111,7 @@ dependencies = [
"serde_urlencoded",
"tokio",
"tokio-rustls 0.23.2",
"tokio-util",
"tokio-util 0.6.9",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
@@ -2508,9 +2640,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.16.1"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a"
checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee"
dependencies = [
"bytes",
"libc",
@@ -2520,10 +2652,21 @@ dependencies = [
"once_cell",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"winapi",
]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
dependencies = [
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-macros"
version = "1.7.0"
@@ -2554,7 +2697,7 @@ dependencies = [
"postgres-types 0.2.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)",
"socket2",
"tokio",
"tokio-util",
"tokio-util 0.6.9",
]
[[package]]
@@ -2576,7 +2719,7 @@ dependencies = [
"postgres-types 0.2.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)",
"socket2",
"tokio",
"tokio-util",
"tokio-util 0.6.9",
]
[[package]]
@@ -2641,6 +2784,20 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
]
[[package]]
name = "toml"
version = "0.5.8"
@@ -2663,6 +2820,75 @@ dependencies = [
"serde",
]
[[package]]
name = "tonic"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff08f4649d10a70ffa3522ca559031285d8e421d727ac85c60825761818f5d0a"
dependencies = [
"async-stream",
"async-trait",
"base64 0.13.0",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"tokio",
"tokio-stream",
"tokio-util 0.6.9",
"tower",
"tower-layer",
"tower-service",
"tracing",
"tracing-futures",
]
[[package]]
name = "tonic-build"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9403f1bafde247186684b230dc6f38b5cd514584e8bec1dd32514be4745fa757"
dependencies = [
"proc-macro2",
"prost-build",
"quote",
"syn",
]
[[package]]
name = "tower"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e"
dependencies = [
"futures-core",
"futures-util",
"indexmap",
"pin-project",
"pin-project-lite",
"rand",
"slab",
"tokio",
"tokio-util 0.7.0",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62"
[[package]]
name = "tower-service"
version = "0.3.1"
@@ -2676,6 +2902,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d8d93354fe2a8e50d5953f5ae2e47a3fc2ef03292e7ea46e3cc38f549525fb9"
dependencies = [
"cfg-if",
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -2768,6 +2995,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99"
[[package]]
name = "unicode-width"
version = "0.1.9"
@@ -2838,6 +3071,7 @@ dependencies = [
"const_format",
"crc32c",
"daemonize",
"etcd-client",
"fs2",
"hex",
"humantime",
@@ -2850,11 +3084,13 @@ dependencies = [
"rust-s3",
"serde",
"serde_json",
"serde_with",
"signal-hook",
"tempfile",
"tokio",
"tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)",
"tracing",
"url",
"walkdir",
"workspace_hack",
"zenith_metrics",

View File

@@ -57,6 +57,10 @@ pub struct LocalEnv {
#[serde(default)]
pub private_key_path: PathBuf,
// A comma separated broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'.
#[serde(default)]
pub broker_endpoints: Option<String>,
pub pageserver: PageServerConf,
#[serde(default)]

View File

@@ -73,6 +73,8 @@ pub struct SafekeeperNode {
pub http_base_url: String,
pub pageserver: Arc<PageServerNode>,
broker_endpoints: Option<String>,
}
impl SafekeeperNode {
@@ -89,6 +91,7 @@ impl SafekeeperNode {
http_client: Client::new(),
http_base_url: format!("http://127.0.0.1:{}/v1", conf.http_port),
pageserver,
broker_endpoints: env.broker_endpoints.clone(),
}
}
@@ -135,6 +138,9 @@ impl SafekeeperNode {
if !self.conf.sync {
cmd.arg("--no-sync");
}
if let Some(ref ep) = self.broker_endpoints {
cmd.args(&["--broker-endpoints", ep]);
}
if !cmd.status()?.success() {
bail!(

View File

@@ -10,6 +10,8 @@ Prerequisites:
below to run from other directories.
- The zenith git repo, including the postgres submodule
(for some tests, e.g. `pg_regress`)
- Some tests (involving storage nodes coordination) require etcd installed. Follow
[`the guide`](https://etcd.io/docs/v3.5/install/) to obtain it.
### Test Organization

View File

@@ -13,7 +13,7 @@ from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.utils import lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.utils import etcd_path, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
@@ -22,6 +22,7 @@ from typing import List, Optional, Any
# succeed and data is written
def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
zenith_env_builder.broker = True
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_normal_work')
@@ -326,6 +327,49 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
proc.join()
# Test that safekeepers push their info to the broker and learn peer status from it
@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH")
def test_broker(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
zenith_env_builder.broker = True
zenith_env_builder.enable_local_fs_remote_storage()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_broker", "main")
pg = env.postgres.create_start('test_broker')
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
# learn zenith timeline from compute
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
# wait until remote_consistent_lsn gets advanced on all safekeepers
clients = [sk.http_client() for sk in env.safekeepers]
stat_before = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
log.info(f"statuses is {stat_before}")
pg.safe_psql("INSERT INTO t SELECT generate_series(1,100), 'payload'")
# force checkpoint to advance remote_consistent_lsn
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
# and wait till remote_consistent_lsn propagates to all safekeepers
started_at = time.time()
while True:
stat_after = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
if all(
lsn_from_hex(s_after.remote_consistent_lsn) > lsn_from_hex(
s_before.remote_consistent_lsn) for s_after,
s_before in zip(stat_after, stat_before)):
break
elapsed = time.time() - started_at
if elapsed > 20:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for remote_consistent_lsn propagation: status before {stat_before}, status current {stat_after}"
)
time.sleep(0.5)
class ProposerPostgres(PgProtocol):
"""Object for running postgres without ZenithEnv"""
def __init__(self,

View File

@@ -1,4 +1,5 @@
import os
import shutil
import subprocess
from typing import Any, List
@@ -76,3 +77,8 @@ def print_gc_result(row):
log.info(
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}"
.format_map(row))
# path to etcd binary or None if not present.
def etcd_path():
return shutil.which("etcd")

View File

@@ -33,7 +33,7 @@ from typing_extensions import Literal
import requests
import backoff # type: ignore
from .utils import (get_self_dir, lsn_from_hex, mkdir_if_needed, subprocess_capture)
from .utils import (etcd_path, get_self_dir, mkdir_if_needed, subprocess_capture, lsn_from_hex)
from fixtures.log_helper import log
"""
This file contains pytest fixtures. A fixture is a test resource that can be
@@ -433,7 +433,8 @@ class ZenithEnvBuilder:
num_safekeepers: int = 0,
pageserver_auth_enabled: bool = False,
rust_log_override: Optional[str] = None,
default_branch_name=DEFAULT_BRANCH_NAME):
default_branch_name=DEFAULT_BRANCH_NAME,
broker: bool = False):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
@@ -442,6 +443,7 @@ class ZenithEnvBuilder:
self.num_safekeepers = num_safekeepers
self.pageserver_auth_enabled = pageserver_auth_enabled
self.default_branch_name = default_branch_name
self.broker = broker
self.env: Optional[ZenithEnv] = None
self.s3_mock_server: Optional[MockS3Server] = None
@@ -517,6 +519,8 @@ class ZenithEnvBuilder:
self.env.pageserver.stop(immediate=True)
if self.s3_mock_server:
self.s3_mock_server.kill()
if self.env.broker is not None:
self.env.broker.stop()
class ZenithEnv:
@@ -569,6 +573,16 @@ class ZenithEnv:
default_tenant_id = '{self.initial_tenant.hex}'
""")
self.broker = None
if config.broker:
# keep etcd datadir inside 'repo'
self.broker = Etcd(datadir=os.path.join(self.repo_dir, "etcd"),
port=self.port_distributor.get_port(),
peer_port=self.port_distributor.get_port())
toml += textwrap.dedent(f"""
broker_endpoints = 'http://127.0.0.1:{self.broker.port}'
""")
# Create config for pageserver
pageserver_port = PageserverPort(
pg=self.port_distributor.get_port(),
@@ -611,12 +625,15 @@ class ZenithEnv:
self.zenith_cli.init(toml)
def start(self):
# Start up the page server and all the safekeepers
# Start up the page server, all the safekeepers and the broker
self.pageserver.start()
for safekeeper in self.safekeepers:
safekeeper.start()
if self.broker is not None:
self.broker.start()
def get_safekeeper_connstrs(self) -> str:
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
@@ -1674,6 +1691,7 @@ class Safekeeper:
class SafekeeperTimelineStatus:
acceptor_epoch: int
flush_lsn: str
remote_consistent_lsn: str
@dataclass
@@ -1697,7 +1715,8 @@ class SafekeeperHttpClient(requests.Session):
res.raise_for_status()
resj = res.json()
return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'],
flush_lsn=resj['flush_lsn'])
flush_lsn=resj['flush_lsn'],
remote_consistent_lsn=resj['remote_consistent_lsn'])
def get_metrics(self) -> SafekeeperMetrics:
request_result = self.get(f"http://localhost:{self.port}/metrics")
@@ -1718,6 +1737,54 @@ class SafekeeperHttpClient(requests.Session):
return metrics
@dataclass
class Etcd:
""" An object managing etcd instance """
datadir: str
port: int
peer_port: int
handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon
def check_status(self):
s = requests.Session()
s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry
s.get(f"http://localhost:{self.port}/health").raise_for_status()
def start(self):
pathlib.Path(self.datadir).mkdir(exist_ok=True)
etcd_full_path = etcd_path()
if etcd_full_path is None:
raise Exception('etcd not found')
with open(os.path.join(self.datadir, "etcd.log"), "wb") as log_file:
args = [
etcd_full_path,
f"--data-dir={self.datadir}",
f"--listen-client-urls=http://localhost:{self.port}",
f"--advertise-client-urls=http://localhost:{self.port}",
f"--listen-peer-urls=http://localhost:{self.peer_port}"
]
self.handle = subprocess.Popen(args, stdout=log_file, stderr=log_file)
# wait for start
started_at = time.time()
while True:
try:
self.check_status()
except Exception as e:
elapsed = time.time() - started_at
if elapsed > 5:
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for etcd start: {e}")
time.sleep(0.5)
else:
break # success
def stop(self):
if self.handle is not None:
self.handle.terminate()
self.handle.wait()
def get_test_output_dir(request: Any) -> str:
""" Compute the working directory for an individual test. """
test_name = request.node.name

View File

@@ -22,11 +22,14 @@ anyhow = "1.0"
crc32c = "0.6.0"
humantime = "2.1.0"
walkdir = "2"
url = "2.2.2"
signal-hook = "0.3.10"
serde = { version = "1.0", features = ["derive"] }
serde_with = {version = "1.12.0"}
hex = "0.4.3"
const_format = "0.2.21"
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }
etcd-client = "0.8.3"
postgres_ffi = { path = "../postgres_ffi" }
zenith_metrics = { path = "../zenith_metrics" }

View File

@@ -11,18 +11,19 @@ use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::thread;
use tracing::*;
use url::{ParseError, Url};
use walkeeper::control_file::{self};
use zenith_utils::http::endpoint;
use zenith_utils::zid::ZNodeId;
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
use tokio::sync::mpsc;
use walkeeper::callmemaybe;
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use walkeeper::http;
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::SafeKeeperConf;
use walkeeper::{broker, callmemaybe};
use zenith_utils::shutdown::exit_now;
use zenith_utils::signals;
@@ -104,6 +105,11 @@ fn main() -> Result<()> {
)
.arg(
Arg::new("id").long("id").takes_value(true).help("safekeeper node id: integer")
).arg(
Arg::new("broker-endpoints")
.long("broker-endpoints")
.takes_value(true)
.help("a comma separated broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'"),
)
.get_matches();
@@ -154,6 +160,11 @@ fn main() -> Result<()> {
));
}
if let Some(addr) = arg_matches.value_of("broker-endpoints") {
let collected_ep: Result<Vec<Url>, ParseError> = addr.split(',').map(Url::parse).collect();
conf.broker_endpoints = Some(collected_ep?);
}
start_safekeeper(conf, given_id, arg_matches.is_present("init"))
}
@@ -259,11 +270,12 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
threads.push(wal_acceptor_thread);
let conf_cloned = conf.clone();
let callmemaybe_thread = thread::Builder::new()
.name("callmemaybe thread".into())
.spawn(|| {
// thread code
let thread_result = callmemaybe::thread_main(conf, rx);
let thread_result = callmemaybe::thread_main(conf_cloned, rx);
if let Err(e) = thread_result {
error!("callmemaybe thread terminated: {}", e);
}
@@ -271,6 +283,17 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
.unwrap();
threads.push(callmemaybe_thread);
if conf.broker_endpoints.is_some() {
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("broker thread".into())
.spawn(|| {
broker::thread_main(conf_);
})?,
);
}
// TODO: put more thoughts into handling of failed threads
// We probably should restart them.

211
walkeeper/src/broker.rs Normal file
View File

@@ -0,0 +1,211 @@
//! Communication with etcd, providing safekeeper peers and pageserver coordination.
use anyhow::bail;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use etcd_client::Client;
use etcd_client::EventType;
use etcd_client::PutOptions;
use etcd_client::WatchOptions;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::str::FromStr;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use zenith_utils::zid::ZTenantId;
use zenith_utils::zid::ZTimelineId;
use zenith_utils::{
lsn::Lsn,
zid::{ZNodeId, ZTenantTimelineId},
};
use crate::{safekeeper::Term, timeline::GlobalTimelines, SafeKeeperConf};
const RETRY_INTERVAL_MSEC: u64 = 1000;
const PUSH_INTERVAL_MSEC: u64 = 1000;
const LEASE_TTL_SEC: i64 = 5;
// TODO: add global zenith installation ID.
const ZENITH_PREFIX: &str = "zenith";
/// Published data about safekeeper. Fields made optional for easy migrations.
#[serde_as]
#[derive(Deserialize, Serialize)]
pub struct SafekeeperInfo {
/// Term of the last entry.
pub last_log_term: Option<Term>,
/// LSN of the last record.
#[serde_as(as = "Option<DisplayFromStr>")]
pub flush_lsn: Option<Lsn>,
/// Up to which LSN safekeeper regards its WAL as committed.
#[serde_as(as = "Option<DisplayFromStr>")]
pub commit_lsn: Option<Lsn>,
/// LSN up to which safekeeper offloaded WAL to s3.
#[serde_as(as = "Option<DisplayFromStr>")]
pub s3_wal_lsn: Option<Lsn>,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "Option<DisplayFromStr>")]
pub remote_consistent_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub peer_horizon_lsn: Option<Lsn>,
}
pub fn thread_main(conf: SafeKeeperConf) {
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _enter = info_span!("broker").entered();
info!("started, broker endpoints {:?}", conf.broker_endpoints);
runtime.block_on(async {
main_loop(conf).await;
});
}
/// Prefix to timeline related data.
fn timeline_path(zttid: &ZTenantTimelineId) -> String {
format!(
"{}/{}/{}",
ZENITH_PREFIX, zttid.tenant_id, zttid.timeline_id
)
}
/// Key to per timeline per safekeeper data.
fn timeline_safekeeper_path(zttid: &ZTenantTimelineId, sk_id: ZNodeId) -> String {
format!("{}/safekeeper/{}", timeline_path(zttid), sk_id)
}
/// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> Result<()> {
let mut client = Client::connect(conf.broker_endpoints.as_ref().unwrap(), None).await?;
// Get and maintain lease to automatically delete obsolete data
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
let (mut keeper, mut ka_stream) = client.lease_keep_alive(lease.id()).await?;
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
loop {
// Note: we lock runtime here and in timeline methods as GlobalTimelines
// is under plain mutex. That's ok, all this code is not performance
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
let active_tlis = GlobalTimelines::get_active_timelines();
for zttid in &active_tlis {
if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) {
let sk_info = tli.get_public_info();
let put_opts = PutOptions::new().with_lease(lease.id());
client
.put(
timeline_safekeeper_path(zttid, conf.my_id),
serde_json::to_string(&sk_info)?,
Some(put_opts),
)
.await
.context("failed to push safekeeper info")?;
}
}
// revive the lease
keeper
.keep_alive()
.await
.context("failed to send LeaseKeepAliveRequest")?;
ka_stream
.message()
.await
.context("failed to receive LeaseKeepAliveResponse")?;
sleep(push_interval).await;
}
}
/// Subscribe and fetch all the interesting data from the broker.
async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
lazy_static! {
static ref TIMELINE_SAFEKEEPER_RE: Regex =
Regex::new(r"^zenith/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]])$")
.unwrap();
}
let mut client = Client::connect(conf.broker_endpoints.as_ref().unwrap(), None).await?;
loop {
let wo = WatchOptions::new().with_prefix();
// TODO: subscribe only to my timelines
let (_, mut stream) = client.watch(ZENITH_PREFIX, Some(wo)).await?;
while let Some(resp) = stream.message().await? {
if resp.canceled() {
bail!("watch canceled");
}
for event in resp.events() {
if EventType::Put == event.event_type() {
if let Some(kv) = event.kv() {
if let Some(caps) = TIMELINE_SAFEKEEPER_RE.captures(kv.key_str()?) {
let tenant_id = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let zttid = ZTenantTimelineId::new(tenant_id, timeline_id);
let safekeeper_id = ZNodeId(caps.get(3).unwrap().as_str().parse()?);
let value_str = kv.value_str()?;
match serde_json::from_str::<SafekeeperInfo>(value_str) {
Ok(safekeeper_info) => {
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
tli.record_safekeeper_info(&safekeeper_info, safekeeper_id)?
}
}
Err(err) => warn!(
"failed to deserialize safekeeper info {}: {}",
value_str, err
),
}
}
}
}
}
}
}
}
async fn main_loop(conf: SafeKeeperConf) {
let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
// Selecting on JoinHandles requires some squats; is there a better way to
// reap tasks individually?
// Handling failures in task itself won't catch panic and in Tokio, task's
// panic doesn't kill the whole executor, so it is better to do reaping
// here.
loop {
tokio::select! {
res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => {
// was it panic or normal error?
let err = match res {
Ok(res_internal) => res_internal.unwrap_err(),
Err(err_outer) => err_outer.into(),
};
warn!("push task failed: {:?}", err);
push_handle = None;
},
res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
// was it panic or normal error?
let err = match res {
Ok(res_internal) => res_internal.unwrap_err(),
Err(err_outer) => err_outer.into(),
};
warn!("pull task failed: {:?}", err);
pull_handle = None;
},
_ = ticker.tick() => {
if push_handle.is_none() {
push_handle = Some(tokio::spawn(push_loop(conf.clone())));
}
if pull_handle.is_none() {
pull_handle = Some(tokio::spawn(pull_loop(conf.clone())));
}
}
}
}
}

View File

@@ -168,7 +168,14 @@ impl SafekeeperPostgresHandler {
fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
let start_pos = self.timeline.get().get_end_of_wal();
let lsn = start_pos.to_string();
let sysid = self.timeline.get().get_info().server.system_id.to_string();
let sysid = self
.timeline
.get()
.get_state()
.1
.server
.system_id
.to_string();
let lsn_bytes = lsn.as_bytes();
let tli = PG_TLI.to_string();
let tli_bytes = tli.as_bytes();

View File

@@ -86,23 +86,24 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
);
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
let sk_state = tli.get_info();
let (inmem, state) = tli.get_state();
let flush_lsn = tli.get_end_of_wal();
let acc_state = AcceptorStateStatus {
term: sk_state.acceptor_state.term,
epoch: sk_state.acceptor_state.get_epoch(flush_lsn),
term_history: sk_state.acceptor_state.term_history,
term: state.acceptor_state.term,
epoch: state.acceptor_state.get_epoch(flush_lsn),
term_history: state.acceptor_state.term_history,
};
// Note: we report in memory values which can be lost.
let status = TimelineStatus {
tenant_id: zttid.tenant_id,
timeline_id: zttid.timeline_id,
acceptor_state: acc_state,
commit_lsn: sk_state.commit_lsn,
s3_wal_lsn: sk_state.s3_wal_lsn,
peer_horizon_lsn: sk_state.peer_horizon_lsn,
remote_consistent_lsn: sk_state.remote_consistent_lsn,
commit_lsn: inmem.commit_lsn,
s3_wal_lsn: inmem.s3_wal_lsn,
peer_horizon_lsn: inmem.peer_horizon_lsn,
remote_consistent_lsn: inmem.remote_consistent_lsn,
flush_lsn,
};
Ok(json_response(StatusCode::OK, status)?)

View File

@@ -73,7 +73,7 @@ pub fn handle_json_ctrl(
let inserted_wal = append_logical_message(spg, append_request)?;
let response = AppendResult {
state: spg.timeline.get().get_info(),
state: spg.timeline.get().get_state().1,
inserted_wal,
};
let response_data = serde_json::to_vec(&response)?;
@@ -112,7 +112,7 @@ fn prepare_safekeeper(spg: &mut SafekeeperPostgresHandler) -> Result<()> {
fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: Lsn) -> Result<()> {
// add new term to existing history
let history = spg.timeline.get().get_info().acceptor_state.term_history;
let history = spg.timeline.get().get_state().1.acceptor_state.term_history;
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
let mut history_entries = history.0;
history_entries.push(TermSwitchEntry { term, lsn });
@@ -142,7 +142,7 @@ fn append_logical_message(
msg: &AppendLogicalMessage,
) -> Result<InsertedWAL> {
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
let sk_state = spg.timeline.get().get_info();
let sk_state = spg.timeline.get().get_state().1;
let begin_lsn = msg.begin_lsn;
let end_lsn = begin_lsn + wal_data.len() as u64;

View File

@@ -1,9 +1,11 @@
//
use std::path::PathBuf;
use std::time::Duration;
use url::Url;
use zenith_utils::zid::{ZNodeId, ZTenantTimelineId};
pub mod broker;
pub mod callmemaybe;
pub mod control_file;
pub mod control_file_upgrade;
@@ -47,6 +49,7 @@ pub struct SafeKeeperConf {
pub ttl: Option<Duration>,
pub recall_period: Duration,
pub my_id: ZNodeId,
pub broker_endpoints: Option<Vec<Url>>,
}
impl SafeKeeperConf {
@@ -71,6 +74,7 @@ impl Default for SafeKeeperConf {
ttl: None,
recall_period: defaults::DEFAULT_RECALL_PERIOD,
my_id: ZNodeId(0),
broker_endpoints: None,
}
}
}

View File

@@ -193,7 +193,7 @@ pub struct SafeKeeperState {
pub peer_horizon_lsn: Lsn,
/// LSN of the oldest known checkpoint made by pageserver and successfully
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
/// informational purposes, we receive it from pageserver.
/// informational purposes, we receive it from pageserver (or broker).
pub remote_consistent_lsn: Lsn,
// Peers and their state as we remember it. Knowing peers themselves is
// fundamental; but state is saved here only for informational purposes and
@@ -203,11 +203,13 @@ pub struct SafeKeeperState {
}
#[derive(Debug, Clone)]
// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; they are
// not flushed yet.
// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
// are not flushed yet.
pub struct SafekeeperMemState {
pub commit_lsn: Lsn,
pub s3_wal_lsn: Lsn, // TODO: keep only persistent version
pub peer_horizon_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
}
impl SafeKeeperState {
@@ -494,14 +496,13 @@ pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
metrics: SafeKeeperMetrics,
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
global_commit_lsn: Lsn,
pub global_commit_lsn: Lsn,
/// LSN since the proposer safekeeper currently talking to appends WAL;
/// determines epoch switch point.
epoch_start_lsn: Lsn,
pub inmem: SafekeeperMemState, // in memory part
pub s: SafeKeeperState, // persistent part
pub s: SafeKeeperState, // persistent part
pub control_store: CTRL,
pub wal_store: WAL,
@@ -529,7 +530,9 @@ where
epoch_start_lsn: Lsn(0),
inmem: SafekeeperMemState {
commit_lsn: state.commit_lsn,
s3_wal_lsn: state.s3_wal_lsn,
peer_horizon_lsn: state.peer_horizon_lsn,
remote_consistent_lsn: state.remote_consistent_lsn,
},
s: state,
control_store,
@@ -545,8 +548,7 @@ where
.up_to(self.wal_store.flush_lsn())
}
#[cfg(test)]
fn get_epoch(&self) -> Term {
pub fn get_epoch(&self) -> Term {
self.s.acceptor_state.get_epoch(self.wal_store.flush_lsn())
}
@@ -697,7 +699,7 @@ where
}
/// Advance commit_lsn taking into account what we have locally
fn update_commit_lsn(&mut self) -> Result<()> {
pub fn update_commit_lsn(&mut self) -> Result<()> {
let commit_lsn = min(self.global_commit_lsn, self.wal_store.flush_lsn());
assert!(commit_lsn >= self.inmem.commit_lsn);

View File

@@ -230,7 +230,7 @@ impl ReplicationConn {
let mut wal_seg_size: usize;
loop {
wal_seg_size = spg.timeline.get().get_info().server.wal_seg_size as usize;
wal_seg_size = spg.timeline.get().get_state().1.server.wal_seg_size as usize;
if wal_seg_size == 0 {
error!("Cannot start replication before connecting to wal_proposer");
sleep(Duration::from_secs(1));

View File

@@ -17,12 +17,14 @@ use tracing::*;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{ZNodeId, ZTenantTimelineId};
use crate::broker::SafekeeperInfo;
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
use crate::control_file;
use crate::control_file::Storage as cf_storage;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState,
};
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
@@ -349,6 +351,11 @@ impl Timeline {
Ok(false)
}
fn is_active(&self) -> bool {
let shared_state = self.mutex.lock().unwrap();
shared_state.active
}
/// Timed wait for an LSN to be committed.
///
/// Returns the last committed LSN, which will be at least
@@ -410,8 +417,61 @@ impl Timeline {
Ok(rmsg)
}
pub fn get_info(&self) -> SafeKeeperState {
self.mutex.lock().unwrap().sk.s.clone()
pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
let shared_state = self.mutex.lock().unwrap();
(shared_state.sk.inmem.clone(), shared_state.sk.s.clone())
}
/// Prepare public safekeeper info for reporting.
pub fn get_public_info(&self) -> SafekeeperInfo {
let shared_state = self.mutex.lock().unwrap();
SafekeeperInfo {
last_log_term: Some(shared_state.sk.get_epoch()),
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
commit_lsn: Some(shared_state.sk.inmem.commit_lsn),
s3_wal_lsn: Some(shared_state.sk.inmem.s3_wal_lsn),
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: Some(max(
shared_state.get_replicas_state().remote_consistent_lsn,
shared_state.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
}
}
/// Update timeline state with peer safekeeper data.
pub fn record_safekeeper_info(&self, sk_info: &SafekeeperInfo, _sk_id: ZNodeId) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
// Note: the check is too restrictive, generally we can update local
// commit_lsn if our history matches (is part of) history of advanced
// commit_lsn provider.
if let (Some(commit_lsn), Some(last_log_term)) = (sk_info.commit_lsn, sk_info.last_log_term)
{
if last_log_term == shared_state.sk.get_epoch() {
shared_state.sk.global_commit_lsn =
max(commit_lsn, shared_state.sk.global_commit_lsn);
shared_state.sk.update_commit_lsn()?;
let local_commit_lsn = min(commit_lsn, shared_state.sk.wal_store.flush_lsn());
shared_state.sk.inmem.commit_lsn =
max(local_commit_lsn, shared_state.sk.inmem.commit_lsn);
}
}
if let Some(s3_wal_lsn) = sk_info.s3_wal_lsn {
shared_state.sk.inmem.s3_wal_lsn = max(s3_wal_lsn, shared_state.sk.inmem.s3_wal_lsn);
}
if let Some(remote_consistent_lsn) = sk_info.remote_consistent_lsn {
shared_state.sk.inmem.remote_consistent_lsn = max(
remote_consistent_lsn,
shared_state.sk.inmem.remote_consistent_lsn,
);
}
if let Some(peer_horizon_lsn) = sk_info.peer_horizon_lsn {
shared_state.sk.inmem.peer_horizon_lsn =
max(peer_horizon_lsn, shared_state.sk.inmem.peer_horizon_lsn);
}
// TODO: sync control file
Ok(())
}
pub fn add_replica(&self, state: ReplicaState) -> usize {
@@ -495,7 +555,7 @@ impl GlobalTimelines {
}
/// Get a timeline with control file loaded from the global TIMELINES map.
/// If control file doesn't exist, bails out.
/// If control file doesn't exist and create=false, bails out.
pub fn get(
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
@@ -537,4 +597,14 @@ impl GlobalTimelines {
}
}
}
/// Get ZTenantTimelineIDs of all active timelines.
pub fn get_active_timelines() -> Vec<ZTenantTimelineId> {
let timelines = TIMELINES.lock().unwrap();
timelines
.iter()
.filter(|&(_, tli)| tli.is_active())
.map(|(zttid, _)| *zttid)
.collect()
}
}