From 52e0816fa5a19bb741c7b053a4f6ae88bb4ff9c8 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Mon, 18 Apr 2022 11:49:46 +0300 Subject: [PATCH] wal_acceptor -> safekeeper --- control_plane/src/compute.rs | 4 +-- control_plane/src/safekeeper.rs | 2 +- safekeeper/src/bin/safekeeper.rs | 8 ++--- test_runner/batch_others/test_auth.py | 8 ++--- .../batch_others/test_restart_compute.py | 6 ++-- test_runner/batch_others/test_tenants.py | 18 +++++------ test_runner/batch_others/test_wal_acceptor.py | 32 +++++++++---------- .../batch_others/test_wal_acceptor_async.py | 6 ++-- test_runner/fixtures/log_helper.py | 2 +- test_runner/fixtures/zenith_fixtures.py | 8 ++--- .../performance/test_bulk_tenant_create.py | 12 +++---- zenith/src/main.rs | 6 ++-- 12 files changed, 56 insertions(+), 56 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 64cd46fef6..1c979acbdf 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -331,14 +331,14 @@ impl PostgresNode { // Configure the node to connect to the safekeepers conf.append("synchronous_standby_names", "walproposer"); - let wal_acceptors = self + let safekeepers = self .env .safekeepers .iter() .map(|sk| format!("localhost:{}", sk.pg_port)) .collect::>() .join(","); - conf.append("wal_acceptors", &wal_acceptors); + conf.append("wal_acceptors", &safekeepers); } else { // We only use setup without safekeepers for tests, // and don't care about data durability on pageserver, diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index e23138bd3f..6f11a4e03d 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -13,8 +13,8 @@ use nix::unistd::Pid; use postgres::Config; use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; -use thiserror::Error; use safekeeper::http::models::TimelineCreateRequest; +use thiserror::Error; use zenith_utils::http::error::HttpErrorBody; use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId}; diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 490198231d..e191cb52fd 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -257,18 +257,18 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: b let (tx, rx) = mpsc::unbounded_channel(); let conf_cloned = conf.clone(); - let wal_acceptor_thread = thread::Builder::new() - .name("WAL acceptor thread".into()) + let safekeeper_thread = thread::Builder::new() + .name("Safekeeper thread".into()) .spawn(|| { // thread code let thread_result = wal_service::thread_main(conf_cloned, pg_listener, tx); if let Err(e) = thread_result { - info!("wal_service thread terminated: {}", e); + info!("safekeeper thread terminated: {}", e); } }) .unwrap(); - threads.push(wal_acceptor_thread); + threads.push(safekeeper_thread); let conf_cloned = conf.clone(); let callmemaybe_thread = thread::Builder::new() diff --git a/test_runner/batch_others/test_auth.py b/test_runner/batch_others/test_auth.py index bda6349ef9..a8ad384f27 100644 --- a/test_runner/batch_others/test_auth.py +++ b/test_runner/batch_others/test_auth.py @@ -52,14 +52,14 @@ def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder): tenant_http_client.tenant_create() -@pytest.mark.parametrize('with_wal_acceptors', [False, True]) -def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool): +@pytest.mark.parametrize('with_safekeepers', [False, True]) +def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_safekeepers: bool): zenith_env_builder.pageserver_auth_enabled = True - if with_wal_acceptors: + if with_safekeepers: zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() - branch = f'test_compute_auth_to_pageserver{with_wal_acceptors}' + branch = f'test_compute_auth_to_pageserver{with_safekeepers}' env.zenith_cli.create_branch(branch) pg = env.postgres.create_start(branch) diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index fd06561c00..d6e7fd9e0d 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -8,10 +8,10 @@ from fixtures.log_helper import log # # Test restarting and recreating a postgres instance # -@pytest.mark.parametrize('with_wal_acceptors', [False, True]) -def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool): +@pytest.mark.parametrize('with_safekeepers', [False, True]) +def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_safekeepers: bool): zenith_env_builder.pageserver_auth_enabled = True - if with_wal_acceptors: + if with_safekeepers: zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() diff --git a/test_runner/batch_others/test_tenants.py b/test_runner/batch_others/test_tenants.py index e883018628..682af8de49 100644 --- a/test_runner/batch_others/test_tenants.py +++ b/test_runner/batch_others/test_tenants.py @@ -5,9 +5,9 @@ import pytest from fixtures.zenith_fixtures import ZenithEnvBuilder -@pytest.mark.parametrize('with_wal_acceptors', [False, True]) -def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool): - if with_wal_acceptors: +@pytest.mark.parametrize('with_safekeepers', [False, True]) +def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_safekeepers: bool): + if with_safekeepers: zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() @@ -15,17 +15,17 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acce tenant_1 = env.zenith_cli.create_tenant() tenant_2 = env.zenith_cli.create_tenant() - env.zenith_cli.create_timeline( - f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}', tenant_id=tenant_1) - env.zenith_cli.create_timeline( - f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}', tenant_id=tenant_2) + env.zenith_cli.create_timeline(f'test_tenants_normal_work_with_safekeepers{with_safekeepers}', + tenant_id=tenant_1) + env.zenith_cli.create_timeline(f'test_tenants_normal_work_with_safekeepers{with_safekeepers}', + tenant_id=tenant_2) pg_tenant1 = env.postgres.create_start( - f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}', + f'test_tenants_normal_work_with_safekeepers{with_safekeepers}', tenant_id=tenant_1, ) pg_tenant2 = env.postgres.create_start( - f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}', + f'test_tenants_normal_work_with_safekeepers{with_safekeepers}', tenant_id=tenant_2, ) diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index dffcd7cc61..cc9ec9a275 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -25,8 +25,8 @@ def test_normal_work(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.broker = True env = zenith_env_builder.init_start() - env.zenith_cli.create_branch('test_wal_acceptors_normal_work') - pg = env.postgres.create_start('test_wal_acceptors_normal_work') + env.zenith_cli.create_branch('test_safekeepers_normal_work') + pg = env.postgres.create_start('test_safekeepers_normal_work') with closing(pg.connect()) as conn: with conn.cursor() as cur: @@ -56,7 +56,7 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder): n_timelines = 3 branch_names = [ - "test_wal_acceptors_many_timelines_{}".format(tlin) for tlin in range(n_timelines) + "test_safekeepers_many_timelines_{}".format(tlin) for tlin in range(n_timelines) ] # pageserver, safekeeper operate timelines via their ids (can be represented in hex as 'ad50847381e248feaac9876cc71ae418') # that's not really human readable, so the branch names are introduced in Zenith CLI. @@ -196,8 +196,8 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = n_acceptors env = zenith_env_builder.init_start() - env.zenith_cli.create_branch('test_wal_acceptors_restarts') - pg = env.postgres.create_start('test_wal_acceptors_restarts') + env.zenith_cli.create_branch('test_safekeepers_restarts') + pg = env.postgres.create_start('test_safekeepers_restarts') # we rely upon autocommit after each statement # as waiting for acceptors happens there @@ -223,7 +223,7 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder): start_delay_sec = 2 -def delayed_wal_acceptor_start(wa): +def delayed_safekeeper_start(wa): time.sleep(start_delay_sec) wa.start() @@ -233,8 +233,8 @@ def test_unavailability(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 2 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch('test_wal_acceptors_unavailability') - pg = env.postgres.create_start('test_wal_acceptors_unavailability') + env.zenith_cli.create_branch('test_safekeepers_unavailability') + pg = env.postgres.create_start('test_safekeepers_unavailability') # we rely upon autocommit after each statement # as waiting for acceptors happens there @@ -248,7 +248,7 @@ def test_unavailability(zenith_env_builder: ZenithEnvBuilder): # shutdown one of two acceptors, that is, majority env.safekeepers[0].stop() - proc = Process(target=delayed_wal_acceptor_start, args=(env.safekeepers[0], )) + proc = Process(target=delayed_safekeeper_start, args=(env.safekeepers[0], )) proc.start() start = time.time() @@ -260,7 +260,7 @@ def test_unavailability(zenith_env_builder: ZenithEnvBuilder): # for the world's balance, do the same with second acceptor env.safekeepers[1].stop() - proc = Process(target=delayed_wal_acceptor_start, args=(env.safekeepers[1], )) + proc = Process(target=delayed_safekeeper_start, args=(env.safekeepers[1], )) proc.start() start = time.time() @@ -304,8 +304,8 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value): zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch('test_wal_acceptors_race_conditions') - pg = env.postgres.create_start('test_wal_acceptors_race_conditions') + env.zenith_cli.create_branch('test_safekeepers_race_conditions') + pg = env.postgres.create_start('test_safekeepers_race_conditions') # we rely upon autocommit after each statement # as waiting for acceptors happens there @@ -396,7 +396,7 @@ class ProposerPostgres(PgProtocol): """ Path to postgresql.conf """ return os.path.join(self.pgdata_dir, 'postgresql.conf') - def create_dir_config(self, wal_acceptors: str): + def create_dir_config(self, safekeepers: str): """ Create dir and config for running --sync-safekeepers """ mkdir_if_needed(self.pg_data_dir_path()) @@ -407,7 +407,7 @@ class ProposerPostgres(PgProtocol): f"zenith.zenith_timeline = '{self.timeline_id.hex}'\n", f"zenith.zenith_tenant = '{self.tenant_id.hex}'\n", f"zenith.page_server_connstring = ''\n", - f"wal_acceptors = '{wal_acceptors}'\n", + f"wal_acceptors = '{safekeepers}'\n", f"listen_addresses = '{self.listen_addr}'\n", f"port = '{self.port}'\n", ] @@ -692,7 +692,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): env.safekeepers[3].stop() active_safekeepers = [1, 2, 3] pg = env.postgres.create('test_replace_safekeeper') - pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers)) + pg.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) pg.start() # learn zenith timeline from compute @@ -732,7 +732,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): pg.stop_and_destroy().create('test_replace_safekeeper') active_safekeepers = [2, 3, 4] env.safekeepers[3].start() - pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers)) + pg.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) pg.start() execute_payload(pg) diff --git a/test_runner/batch_others/test_wal_acceptor_async.py b/test_runner/batch_others/test_wal_acceptor_async.py index aadafc76cf..e3df8ea3eb 100644 --- a/test_runner/batch_others/test_wal_acceptor_async.py +++ b/test_runner/batch_others/test_wal_acceptor_async.py @@ -9,7 +9,7 @@ from fixtures.log_helper import getLogger from fixtures.utils import lsn_from_hex, lsn_to_hex from typing import List -log = getLogger('root.wal_acceptor_async') +log = getLogger('root.safekeeper_async') class BankClient(object): @@ -207,9 +207,9 @@ def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch('test_wal_acceptors_restarts_under_load') + env.zenith_cli.create_branch('test_safekeepers_restarts_under_load') # Enable backpressure with 1MB maximal lag, because we don't want to block on `wait_for_lsn()` for too long - pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load', + pg = env.postgres.create_start('test_safekeepers_restarts_under_load', config_lines=['max_replication_write_lag=1MB']) asyncio.run(run_restarts_under_load(env, pg, env.safekeepers)) diff --git a/test_runner/fixtures/log_helper.py b/test_runner/fixtures/log_helper.py index 9aa5f40bf3..7c2d83d4e3 100644 --- a/test_runner/fixtures/log_helper.py +++ b/test_runner/fixtures/log_helper.py @@ -25,7 +25,7 @@ LOGGING = { "root": { "level": "INFO" }, - "root.wal_acceptor_async": { + "root.safekeeper_async": { "level": "INFO" # a lot of logs on DEBUG level } } diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index f8ee39a5a1..e0f08a3bfb 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -612,7 +612,7 @@ class ZenithEnv: self.broker.start() def get_safekeeper_connstrs(self) -> str: - """ Get list of safekeeper endpoints suitable for wal_acceptors GUC """ + """ Get list of safekeeper endpoints suitable for safekeepers GUC """ return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers]) @cached_property @@ -1484,7 +1484,7 @@ class Postgres(PgProtocol): """ Path to postgresql.conf """ return os.path.join(self.pg_data_dir_path(), 'postgresql.conf') - def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres': + def adjust_for_safekeepers(self, safekeepers: str) -> 'Postgres': """ Adjust instance config for working with wal acceptors instead of pageserver (pre-configured by CLI) directly. @@ -1499,12 +1499,12 @@ class Postgres(PgProtocol): if ("synchronous_standby_names" in cfg_line or # don't ask pageserver to fetch WAL from compute "callmemaybe_connstring" in cfg_line or - # don't repeat wal_acceptors multiple times + # don't repeat safekeepers/wal_acceptors multiple times "wal_acceptors" in cfg_line): continue f.write(cfg_line) f.write("synchronous_standby_names = 'walproposer'\n") - f.write("wal_acceptors = '{}'\n".format(wal_acceptors)) + f.write("wal_acceptors = '{}'\n".format(safekeepers)) return self def config(self, lines: List[str]) -> 'Postgres': diff --git a/test_runner/performance/test_bulk_tenant_create.py b/test_runner/performance/test_bulk_tenant_create.py index fbef131ffd..f0729d3a07 100644 --- a/test_runner/performance/test_bulk_tenant_create.py +++ b/test_runner/performance/test_bulk_tenant_create.py @@ -13,15 +13,15 @@ from fixtures.zenith_fixtures import ZenithEnvBuilder @pytest.mark.parametrize('tenants_count', [1, 5, 10]) -@pytest.mark.parametrize('use_wal_acceptors', ['with_wa', 'without_wa']) +@pytest.mark.parametrize('use_safekeepers', ['with_wa', 'without_wa']) def test_bulk_tenant_create( zenith_env_builder: ZenithEnvBuilder, - use_wal_acceptors: str, + use_safekeepers: str, tenants_count: int, zenbenchmark, ): """Measure tenant creation time (with and without wal acceptors)""" - if use_wal_acceptors == 'with_wa': + if use_safekeepers == 'with_wa': zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() @@ -32,14 +32,14 @@ def test_bulk_tenant_create( tenant = env.zenith_cli.create_tenant() env.zenith_cli.create_timeline( - f'test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}', tenant_id=tenant) + f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant) # FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now? - #if use_wal_acceptors == 'with_wa': + #if use_safekeepers == 'with_sa': # wa_factory.start_n_new(3) pg_tenant = env.postgres.create_start( - f'test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}', tenant_id=tenant) + f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant) end = timeit.default_timer() time_slices.append(end - start) diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 97b07b7b74..18368895a4 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -9,13 +9,13 @@ use pageserver::config::defaults::{ DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR, DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR, }; -use std::collections::{BTreeSet, HashMap}; -use std::process::exit; -use std::str::FromStr; use safekeeper::defaults::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, }; +use std::collections::{BTreeSet, HashMap}; +use std::process::exit; +use std::str::FromStr; use zenith_utils::auth::{Claims, Scope}; use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend::AuthType;