wal_acceptor -> safekeeper

This commit is contained in:
Kirill Bulatov
2022-04-18 11:49:46 +03:00
committed by Kirill Bulatov
parent 81417788c8
commit 52e0816fa5
12 changed files with 56 additions and 56 deletions

View File

@@ -331,14 +331,14 @@ impl PostgresNode {
// Configure the node to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
let wal_acceptors = self
let safekeepers = self
.env
.safekeepers
.iter()
.map(|sk| format!("localhost:{}", sk.pg_port))
.collect::<Vec<String>>()
.join(",");
conf.append("wal_acceptors", &wal_acceptors);
conf.append("wal_acceptors", &safekeepers);
} else {
// We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver,

View File

@@ -13,8 +13,8 @@ use nix::unistd::Pid;
use postgres::Config;
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use safekeeper::http::models::TimelineCreateRequest;
use thiserror::Error;
use zenith_utils::http::error::HttpErrorBody;
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};

View File

@@ -257,18 +257,18 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
let (tx, rx) = mpsc::unbounded_channel();
let conf_cloned = conf.clone();
let wal_acceptor_thread = thread::Builder::new()
.name("WAL acceptor thread".into())
let safekeeper_thread = thread::Builder::new()
.name("Safekeeper thread".into())
.spawn(|| {
// thread code
let thread_result = wal_service::thread_main(conf_cloned, pg_listener, tx);
if let Err(e) = thread_result {
info!("wal_service thread terminated: {}", e);
info!("safekeeper thread terminated: {}", e);
}
})
.unwrap();
threads.push(wal_acceptor_thread);
threads.push(safekeeper_thread);
let conf_cloned = conf.clone();
let callmemaybe_thread = thread::Builder::new()

View File

@@ -52,14 +52,14 @@ def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder):
tenant_http_client.tenant_create()
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool):
@pytest.mark.parametrize('with_safekeepers', [False, True])
def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_safekeepers: bool):
zenith_env_builder.pageserver_auth_enabled = True
if with_wal_acceptors:
if with_safekeepers:
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
branch = f'test_compute_auth_to_pageserver{with_wal_acceptors}'
branch = f'test_compute_auth_to_pageserver{with_safekeepers}'
env.zenith_cli.create_branch(branch)
pg = env.postgres.create_start(branch)

View File

@@ -8,10 +8,10 @@ from fixtures.log_helper import log
#
# Test restarting and recreating a postgres instance
#
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool):
@pytest.mark.parametrize('with_safekeepers', [False, True])
def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_safekeepers: bool):
zenith_env_builder.pageserver_auth_enabled = True
if with_wal_acceptors:
if with_safekeepers:
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()

View File

@@ -5,9 +5,9 @@ import pytest
from fixtures.zenith_fixtures import ZenithEnvBuilder
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool):
if with_wal_acceptors:
@pytest.mark.parametrize('with_safekeepers', [False, True])
def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_safekeepers: bool):
if with_safekeepers:
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
@@ -15,17 +15,17 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acce
tenant_1 = env.zenith_cli.create_tenant()
tenant_2 = env.zenith_cli.create_tenant()
env.zenith_cli.create_timeline(
f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}', tenant_id=tenant_1)
env.zenith_cli.create_timeline(
f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}', tenant_id=tenant_2)
env.zenith_cli.create_timeline(f'test_tenants_normal_work_with_safekeepers{with_safekeepers}',
tenant_id=tenant_1)
env.zenith_cli.create_timeline(f'test_tenants_normal_work_with_safekeepers{with_safekeepers}',
tenant_id=tenant_2)
pg_tenant1 = env.postgres.create_start(
f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}',
f'test_tenants_normal_work_with_safekeepers{with_safekeepers}',
tenant_id=tenant_1,
)
pg_tenant2 = env.postgres.create_start(
f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}',
f'test_tenants_normal_work_with_safekeepers{with_safekeepers}',
tenant_id=tenant_2,
)

View File

@@ -25,8 +25,8 @@ def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.broker = True
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_normal_work')
pg = env.postgres.create_start('test_wal_acceptors_normal_work')
env.zenith_cli.create_branch('test_safekeepers_normal_work')
pg = env.postgres.create_start('test_safekeepers_normal_work')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
@@ -56,7 +56,7 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
n_timelines = 3
branch_names = [
"test_wal_acceptors_many_timelines_{}".format(tlin) for tlin in range(n_timelines)
"test_safekeepers_many_timelines_{}".format(tlin) for tlin in range(n_timelines)
]
# pageserver, safekeeper operate timelines via their ids (can be represented in hex as 'ad50847381e248feaac9876cc71ae418')
# that's not really human readable, so the branch names are introduced in Zenith CLI.
@@ -196,8 +196,8 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = n_acceptors
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_restarts')
pg = env.postgres.create_start('test_wal_acceptors_restarts')
env.zenith_cli.create_branch('test_safekeepers_restarts')
pg = env.postgres.create_start('test_safekeepers_restarts')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -223,7 +223,7 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder):
start_delay_sec = 2
def delayed_wal_acceptor_start(wa):
def delayed_safekeeper_start(wa):
time.sleep(start_delay_sec)
wa.start()
@@ -233,8 +233,8 @@ def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 2
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_unavailability')
pg = env.postgres.create_start('test_wal_acceptors_unavailability')
env.zenith_cli.create_branch('test_safekeepers_unavailability')
pg = env.postgres.create_start('test_safekeepers_unavailability')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -248,7 +248,7 @@ def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
# shutdown one of two acceptors, that is, majority
env.safekeepers[0].stop()
proc = Process(target=delayed_wal_acceptor_start, args=(env.safekeepers[0], ))
proc = Process(target=delayed_safekeeper_start, args=(env.safekeepers[0], ))
proc.start()
start = time.time()
@@ -260,7 +260,7 @@ def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
# for the world's balance, do the same with second acceptor
env.safekeepers[1].stop()
proc = Process(target=delayed_wal_acceptor_start, args=(env.safekeepers[1], ))
proc = Process(target=delayed_safekeeper_start, args=(env.safekeepers[1], ))
proc.start()
start = time.time()
@@ -304,8 +304,8 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_race_conditions')
pg = env.postgres.create_start('test_wal_acceptors_race_conditions')
env.zenith_cli.create_branch('test_safekeepers_race_conditions')
pg = env.postgres.create_start('test_safekeepers_race_conditions')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -396,7 +396,7 @@ class ProposerPostgres(PgProtocol):
""" Path to postgresql.conf """
return os.path.join(self.pgdata_dir, 'postgresql.conf')
def create_dir_config(self, wal_acceptors: str):
def create_dir_config(self, safekeepers: str):
""" Create dir and config for running --sync-safekeepers """
mkdir_if_needed(self.pg_data_dir_path())
@@ -407,7 +407,7 @@ class ProposerPostgres(PgProtocol):
f"zenith.zenith_timeline = '{self.timeline_id.hex}'\n",
f"zenith.zenith_tenant = '{self.tenant_id.hex}'\n",
f"zenith.page_server_connstring = ''\n",
f"wal_acceptors = '{wal_acceptors}'\n",
f"wal_acceptors = '{safekeepers}'\n",
f"listen_addresses = '{self.listen_addr}'\n",
f"port = '{self.port}'\n",
]
@@ -692,7 +692,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
env.safekeepers[3].stop()
active_safekeepers = [1, 2, 3]
pg = env.postgres.create('test_replace_safekeeper')
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
pg.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
pg.start()
# learn zenith timeline from compute
@@ -732,7 +732,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
pg.stop_and_destroy().create('test_replace_safekeeper')
active_safekeepers = [2, 3, 4]
env.safekeepers[3].start()
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
pg.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
pg.start()
execute_payload(pg)

View File

@@ -9,7 +9,7 @@ from fixtures.log_helper import getLogger
from fixtures.utils import lsn_from_hex, lsn_to_hex
from typing import List
log = getLogger('root.wal_acceptor_async')
log = getLogger('root.safekeeper_async')
class BankClient(object):
@@ -207,9 +207,9 @@ def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_restarts_under_load')
env.zenith_cli.create_branch('test_safekeepers_restarts_under_load')
# Enable backpressure with 1MB maximal lag, because we don't want to block on `wait_for_lsn()` for too long
pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load',
pg = env.postgres.create_start('test_safekeepers_restarts_under_load',
config_lines=['max_replication_write_lag=1MB'])
asyncio.run(run_restarts_under_load(env, pg, env.safekeepers))

View File

@@ -25,7 +25,7 @@ LOGGING = {
"root": {
"level": "INFO"
},
"root.wal_acceptor_async": {
"root.safekeeper_async": {
"level": "INFO" # a lot of logs on DEBUG level
}
}

View File

@@ -612,7 +612,7 @@ class ZenithEnv:
self.broker.start()
def get_safekeeper_connstrs(self) -> str:
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
""" Get list of safekeeper endpoints suitable for safekeepers GUC """
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
@cached_property
@@ -1484,7 +1484,7 @@ class Postgres(PgProtocol):
""" Path to postgresql.conf """
return os.path.join(self.pg_data_dir_path(), 'postgresql.conf')
def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres':
def adjust_for_safekeepers(self, safekeepers: str) -> 'Postgres':
"""
Adjust instance config for working with wal acceptors instead of
pageserver (pre-configured by CLI) directly.
@@ -1499,12 +1499,12 @@ class Postgres(PgProtocol):
if ("synchronous_standby_names" in cfg_line or
# don't ask pageserver to fetch WAL from compute
"callmemaybe_connstring" in cfg_line or
# don't repeat wal_acceptors multiple times
# don't repeat safekeepers/wal_acceptors multiple times
"wal_acceptors" in cfg_line):
continue
f.write(cfg_line)
f.write("synchronous_standby_names = 'walproposer'\n")
f.write("wal_acceptors = '{}'\n".format(wal_acceptors))
f.write("wal_acceptors = '{}'\n".format(safekeepers))
return self
def config(self, lines: List[str]) -> 'Postgres':

View File

@@ -13,15 +13,15 @@ from fixtures.zenith_fixtures import ZenithEnvBuilder
@pytest.mark.parametrize('tenants_count', [1, 5, 10])
@pytest.mark.parametrize('use_wal_acceptors', ['with_wa', 'without_wa'])
@pytest.mark.parametrize('use_safekeepers', ['with_wa', 'without_wa'])
def test_bulk_tenant_create(
zenith_env_builder: ZenithEnvBuilder,
use_wal_acceptors: str,
use_safekeepers: str,
tenants_count: int,
zenbenchmark,
):
"""Measure tenant creation time (with and without wal acceptors)"""
if use_wal_acceptors == 'with_wa':
if use_safekeepers == 'with_wa':
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
@@ -32,14 +32,14 @@ def test_bulk_tenant_create(
tenant = env.zenith_cli.create_tenant()
env.zenith_cli.create_timeline(
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}', tenant_id=tenant)
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant)
# FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now?
#if use_wal_acceptors == 'with_wa':
#if use_safekeepers == 'with_sa':
# wa_factory.start_n_new(3)
pg_tenant = env.postgres.create_start(
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}', tenant_id=tenant)
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant)
end = timeit.default_timer()
time_slices.append(end - start)

View File

@@ -9,13 +9,13 @@ use pageserver::config::defaults::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
};
use std::collections::{BTreeSet, HashMap};
use std::process::exit;
use std::str::FromStr;
use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use std::collections::{BTreeSet, HashMap};
use std::process::exit;
use std::str::FromStr;
use zenith_utils::auth::{Claims, Scope};
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::AuthType;