mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 03:50:37 +00:00
Various fixes for test_sync_safekeepers (#668)
* Send ProposerGreeting manually in tests * Move test_sync_safekeepers to test_wal_acceptor.py * Capture test_sync_safekeepers output * Add comment for handle_json_ctrl * Save captured output in CI
This commit is contained in:
committed by
GitHub
parent
7a370394a7
commit
d6fc74a412
@@ -254,7 +254,7 @@ jobs:
|
||||
when: always
|
||||
command: |
|
||||
du -sh /tmp/test_output/*
|
||||
find /tmp/test_output -type f ! -name "pg.log" ! -name "pageserver.log" ! -name "wal_acceptor.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" -delete
|
||||
find /tmp/test_output -type f ! -name "pg.log" ! -name "pageserver.log" ! -name "wal_acceptor.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" ! -name "*.stdout" ! -name "*.stderr" -delete
|
||||
du -sh /tmp/test_output/*
|
||||
- store_artifacts:
|
||||
path: /tmp/test_output
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
import pytest
|
||||
import random
|
||||
import time
|
||||
import os
|
||||
import subprocess
|
||||
import uuid
|
||||
|
||||
from contextlib import closing
|
||||
from multiprocessing import Process, Value
|
||||
from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory
|
||||
from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory, PgBin
|
||||
from fixtures.utils import lsn_to_hex, mkdir_if_needed
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
@@ -198,3 +202,92 @@ def test_race_conditions(zenith_cli, pageserver: ZenithPageserver, postgres: Pos
|
||||
|
||||
stop_value.value = 1
|
||||
proc.join()
|
||||
|
||||
class ProposerPostgres:
|
||||
"""Object for running safekeepers sync with walproposer"""
|
||||
def __init__(self, pgdata_dir: str, pg_bin: PgBin, timeline_id: str, tenant_id: str):
|
||||
self.pgdata_dir: str = pgdata_dir
|
||||
self.pg_bin: PgBin = pg_bin
|
||||
self.timeline_id: str = timeline_id
|
||||
self.tenant_id: str = tenant_id
|
||||
|
||||
def pg_data_dir_path(self) -> str:
|
||||
""" Path to data directory """
|
||||
return self.pgdata_dir
|
||||
|
||||
def config_file_path(self) -> str:
|
||||
""" Path to postgresql.conf """
|
||||
return os.path.join(self.pgdata_dir, 'postgresql.conf')
|
||||
|
||||
def create_dir_config(self, wal_acceptors: str):
|
||||
""" Create dir and config for running --sync-safekeepers """
|
||||
|
||||
mkdir_if_needed(self.pg_data_dir_path())
|
||||
with open(self.config_file_path(), "w") as f:
|
||||
f.writelines([
|
||||
"synchronous_standby_names = 'walproposer'\n",
|
||||
f"zenith.zenith_timeline = '{self.timeline_id}'\n",
|
||||
f"zenith.zenith_tenant = '{self.tenant_id}'\n",
|
||||
f"wal_acceptors = '{wal_acceptors}'\n",
|
||||
])
|
||||
|
||||
def sync_safekeepers(self) -> str:
|
||||
"""
|
||||
Run 'postgres --sync-safekeepers'.
|
||||
Returns execution result, which is commit_lsn after sync.
|
||||
"""
|
||||
|
||||
command = ["postgres", "--sync-safekeepers"]
|
||||
env = {
|
||||
"PGDATA": self.pg_data_dir_path(),
|
||||
}
|
||||
|
||||
basepath = self.pg_bin.run_capture(command, env)
|
||||
stdout_filename = basepath + '.stdout'
|
||||
|
||||
with open(stdout_filename, 'r') as stdout_f:
|
||||
stdout = stdout_f.read()
|
||||
return stdout.strip("\n ")
|
||||
|
||||
|
||||
# insert wal in all safekeepers and run sync on proposer
|
||||
def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorFactory):
|
||||
wa_factory.start_n_new(3)
|
||||
|
||||
timeline_id = uuid.uuid4().hex
|
||||
tenant_id = uuid.uuid4().hex
|
||||
|
||||
# write config for proposer
|
||||
pgdata_dir = os.path.join(repo_dir, "proposer_pgdata")
|
||||
pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id)
|
||||
pg.create_dir_config(wa_factory.get_connstrs())
|
||||
|
||||
# valid lsn, which is not in the segment start, nor in zero segment
|
||||
epoch_start_lsn = 0x16B9188 # 0/16B9188
|
||||
begin_lsn = epoch_start_lsn
|
||||
|
||||
# append and commit WAL
|
||||
lsn_after_append = []
|
||||
for i in range(3):
|
||||
res = wa_factory.instances[i].append_logical_message(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
{
|
||||
"lm_prefix": "prefix",
|
||||
"lm_message": "message",
|
||||
"set_commit_lsn": True,
|
||||
"term": 2,
|
||||
"begin_lsn": begin_lsn,
|
||||
"epoch_start_lsn": epoch_start_lsn,
|
||||
"truncate_lsn": epoch_start_lsn,
|
||||
},
|
||||
)
|
||||
lsn_hex = lsn_to_hex(res["inserted_wal"]["end_lsn"])
|
||||
lsn_after_append.append(lsn_hex)
|
||||
print(f"safekeeper[{i}] lsn after append: {lsn_hex}")
|
||||
|
||||
# run sync safekeepers
|
||||
lsn_after_sync = pg.sync_safekeepers()
|
||||
print(f"lsn after sync = {lsn_after_sync}")
|
||||
|
||||
assert all(lsn_after_sync == lsn for lsn in lsn_after_append)
|
||||
|
||||
@@ -1,105 +0,0 @@
|
||||
import os
|
||||
import pytest
|
||||
import subprocess
|
||||
import uuid
|
||||
|
||||
from fixtures.zenith_fixtures import WalAcceptorFactory, PgBin
|
||||
from fixtures.utils import lsn_to_hex, mkdir_if_needed
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
|
||||
class ProposerPostgres:
|
||||
"""Object for running safekeepers sync with walproposer"""
|
||||
def __init__(self, pgdata_dir: str, pg_bin: PgBin, timeline_id: str, tenant_id: str):
|
||||
self.pgdata_dir: str = pgdata_dir
|
||||
self.pg_bin: PgBin = pg_bin
|
||||
self.timeline_id: str = timeline_id
|
||||
self.tenant_id: str = tenant_id
|
||||
|
||||
def pg_data_dir_path(self) -> str:
|
||||
""" Path to data directory """
|
||||
return self.pgdata_dir
|
||||
|
||||
def config_file_path(self) -> str:
|
||||
""" Path to postgresql.conf """
|
||||
return os.path.join(self.pgdata_dir, 'postgresql.conf')
|
||||
|
||||
def create_dir_config(self, wal_acceptors: str):
|
||||
""" Create dir and config for running --sync-safekeepers """
|
||||
|
||||
mkdir_if_needed(self.pg_data_dir_path())
|
||||
with open(self.config_file_path(), "w") as f:
|
||||
f.write("zenith.zenith_timeline = '{}'\n".format(self.timeline_id))
|
||||
f.write("zenith.zenith_tenant = '{}'\n".format(self.tenant_id))
|
||||
f.write("synchronous_standby_names = '{}'\n".format("walproposer"))
|
||||
f.write("wal_acceptors = '{}'\n".format(wal_acceptors))
|
||||
|
||||
def sync_safekeepers(self) -> subprocess.CompletedProcess:
|
||||
"""
|
||||
Run 'postgres --sync-safekeepers'.
|
||||
Returns execution result, which is commit_lsn after sync.
|
||||
"""
|
||||
|
||||
pg_path = os.path.join(self.pg_bin.pg_bin_path, "postgres")
|
||||
command = [pg_path, "--sync-safekeepers"]
|
||||
env = {
|
||||
"PGDATA": self.pg_data_dir_path(),
|
||||
}
|
||||
|
||||
print('Running command "{}"'.format(" ".join(command)))
|
||||
res = subprocess.run(
|
||||
command, env=env, check=True, text=True, stdout=subprocess.PIPE
|
||||
)
|
||||
|
||||
return res.stdout.strip("\n ")
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="not stable enough")
|
||||
# insert wal in all safekeepers and run sync on proposer
|
||||
def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorFactory):
|
||||
wa_factory.start_n_new(3)
|
||||
|
||||
timeline_id = uuid.uuid4().hex
|
||||
tenant_id = uuid.uuid4().hex
|
||||
|
||||
# write config for proposer
|
||||
pgdata_dir = os.path.join(repo_dir, "proposer_pgdata")
|
||||
pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id)
|
||||
pg.create_dir_config(wa_factory.get_connstrs())
|
||||
|
||||
# run sync to init safekeepers with ProposerGreeting
|
||||
initial_lsn = pg.sync_safekeepers()
|
||||
|
||||
# should be 0/0 for empty safekeepers
|
||||
assert initial_lsn == "0/0"
|
||||
|
||||
# valid lsn, which is not in the segment start, nor in zero segment
|
||||
epoch_start_lsn = 0x16B9188 # 0/16B9188
|
||||
begin_lsn = epoch_start_lsn
|
||||
|
||||
# append and commit WAL
|
||||
lsn_after_append = []
|
||||
for i in range(3):
|
||||
res = wa_factory.instances[i].append_logical_message(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
{
|
||||
"lm_prefix": "prefix",
|
||||
"lm_message": "message",
|
||||
"set_commit_lsn": True,
|
||||
"term": 2,
|
||||
"begin_lsn": begin_lsn,
|
||||
"epoch_start_lsn": epoch_start_lsn,
|
||||
"truncate_lsn": epoch_start_lsn,
|
||||
},
|
||||
)
|
||||
lsn_hex = lsn_to_hex(res["inserted_wal"]["end_lsn"])
|
||||
lsn_after_append.append(lsn_hex)
|
||||
print(f"safekeeper[{i}] lsn after append: {lsn_hex}")
|
||||
|
||||
# run sync safekeepers
|
||||
lsn_after_sync = pg.sync_safekeepers()
|
||||
print(f"lsn after sync = {lsn_after_sync}")
|
||||
|
||||
assert all(lsn_after_sync == lsn for lsn in lsn_after_append)
|
||||
@@ -21,7 +21,7 @@ def mkdir_if_needed(path: str) -> None:
|
||||
assert os.path.isdir(path)
|
||||
|
||||
|
||||
def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> None:
|
||||
def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
|
||||
""" Run a process and capture its output
|
||||
|
||||
Output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr"
|
||||
@@ -29,6 +29,7 @@ def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> None:
|
||||
counter.
|
||||
|
||||
If those files already exist, we will overwrite them.
|
||||
Returns basepath for files with captured output.
|
||||
"""
|
||||
assert type(cmd) is list
|
||||
base = os.path.basename(cmd[0]) + '_{}'.format(global_counter())
|
||||
@@ -41,6 +42,8 @@ def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> None:
|
||||
print('(capturing output to "{}.stdout")'.format(base))
|
||||
subprocess.run(cmd, **kwargs, stdout=stdout_f, stderr=stderr_f)
|
||||
|
||||
return basepath
|
||||
|
||||
|
||||
_global_counter = 0
|
||||
|
||||
|
||||
@@ -486,17 +486,19 @@ class PgBin:
|
||||
def run_capture(self,
|
||||
command: List[str],
|
||||
env: Optional[Env] = None,
|
||||
cwd: Optional[str] = None) -> None:
|
||||
cwd: Optional[str] = None,
|
||||
**kwargs: Any) -> None:
|
||||
"""
|
||||
Run one of the postgres binaries, with stderr and stdout redirected to a file.
|
||||
|
||||
This is just like `run`, but for chatty programs.
|
||||
This is just like `run`, but for chatty programs. Returns basepath for files
|
||||
with captured output.
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(' '.join(command)))
|
||||
env = self._build_env(env)
|
||||
subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True)
|
||||
return subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True, **kwargs)
|
||||
|
||||
|
||||
@zenfixture
|
||||
|
||||
@@ -13,7 +13,9 @@ use log::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
|
||||
use crate::safekeeper::{AppendRequest, AppendRequestHeader, ProposerAcceptorMessage};
|
||||
use crate::safekeeper::{
|
||||
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerGreeting,
|
||||
};
|
||||
use crate::safekeeper::{SafeKeeperState, Term};
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::timeline::TimelineTools;
|
||||
@@ -48,6 +50,9 @@ struct AppendResult {
|
||||
inserted_wal: InsertedWAL,
|
||||
}
|
||||
|
||||
/// Handles command to craft logical message WAL record with given
|
||||
/// content, and then append it with specified term and lsn. This
|
||||
/// function is used to test safekeepers in different scenarios.
|
||||
pub fn handle_json_ctrl(
|
||||
swh: &mut SendWalHandler,
|
||||
pgb: &mut PostgresBackend,
|
||||
@@ -62,6 +67,9 @@ pub fn handle_json_ctrl(
|
||||
let append_request: AppendLogicalMessage = serde_json::from_slice(cmd)?;
|
||||
info!("JSON_CTRL request: {:?}", append_request);
|
||||
|
||||
// need to init safekeeper state before AppendRequest
|
||||
prepare_safekeeper(swh)?;
|
||||
|
||||
let inserted_wal = append_logical_message(swh, append_request)?;
|
||||
let response = AppendResult {
|
||||
state: swh.timeline.get().get_info(),
|
||||
@@ -80,6 +88,27 @@ pub fn handle_json_ctrl(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prepare safekeeper to process append requests without crashes,
|
||||
/// by sending ProposerGreeting with default server.wal_seg_size.
|
||||
fn prepare_safekeeper(swh: &mut SendWalHandler) -> Result<()> {
|
||||
let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting {
|
||||
protocol_version: 1, // current protocol
|
||||
pg_version: 0, // unknown
|
||||
proposer_id: [0u8; 16],
|
||||
system_id: 0,
|
||||
ztli: swh.timelineid.unwrap(),
|
||||
tenant_id: swh.tenantid.unwrap(),
|
||||
tli: 0,
|
||||
wal_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, // 16MB, default for tests
|
||||
});
|
||||
|
||||
let response = swh.timeline.get().process_msg(&greeting_request)?;
|
||||
match response {
|
||||
AcceptorProposerMessage::Greeting(_) => Ok(()),
|
||||
_ => anyhow::bail!("not GreetingResponse"),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct InsertedWAL {
|
||||
begin_lsn: Lsn,
|
||||
@@ -122,7 +151,7 @@ fn append_logical_message(
|
||||
|
||||
let append_response = match response {
|
||||
AcceptorProposerMessage::AppendResponse(resp) => resp,
|
||||
_ => return Err(anyhow!("not AppendResponse")),
|
||||
_ => anyhow::bail!("not AppendResponse"),
|
||||
};
|
||||
|
||||
Ok(InsertedWAL {
|
||||
@@ -148,9 +177,9 @@ impl XlLogicalMessage {
|
||||
}
|
||||
}
|
||||
|
||||
// Create new WAL record for non-transactional logical message.
|
||||
// Used for creating artificial WAL for tests, as LogicalMessage
|
||||
// record is basically no-op.
|
||||
/// Create new WAL record for non-transactional logical message.
|
||||
/// Used for creating artificial WAL for tests, as LogicalMessage
|
||||
/// record is basically no-op.
|
||||
fn encode_logical_message(prefix: String, message: String) -> Vec<u8> {
|
||||
let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1);
|
||||
prefix_bytes.put(prefix.as_bytes());
|
||||
|
||||
@@ -50,9 +50,11 @@ impl postgres_backend::Handler for SendWalHandler {
|
||||
}
|
||||
|
||||
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> {
|
||||
// START_WAL_PUSH is the only command that initializes the timeline
|
||||
// START_WAL_PUSH is the only command that initializes the timeline in production.
|
||||
// There is also JSON_CTRL command, which should initialize the timeline for testing.
|
||||
if self.timeline.is_none() {
|
||||
if query_string.starts_with(b"START_WAL_PUSH") {
|
||||
if query_string.starts_with(b"START_WAL_PUSH") || query_string.starts_with(b"JSON_CTRL")
|
||||
{
|
||||
self.timeline.set(
|
||||
&self.conf,
|
||||
self.tenantid.unwrap(),
|
||||
|
||||
Reference in New Issue
Block a user