From 18d625e5563fbdf873c2f25f4a11074db130a8c6 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 22 Jun 2021 18:23:09 +0300 Subject: [PATCH] wip todo: add s3 offloading progress, test epoch once its update will be fixed. ref #115 --- Cargo.lock | 1 + pageserver/src/lib.rs | 3 + test_runner/batch_others/test_wal_acceptor.py | 20 ++++ test_runner/fixtures/zenith_fixtures.py | 11 ++- walkeeper/Cargo.toml | 1 + walkeeper/src/receive_wal.rs | 4 +- walkeeper/src/send_wal.rs | 48 +++++++--- walkeeper/src/timeline.rs | 91 ++++++++++++------- zenith_utils/src/postgres_backend.rs | 7 +- zenith_utils/src/pq_proto.rs | 1 + 10 files changed, 137 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6a130d30b6..bd4a99e0e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2254,6 +2254,7 @@ dependencies = [ "regex", "rust-s3", "serde", + "serde_json", "slog", "slog-async", "slog-scope", diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 93563a0f7b..bc841fa79d 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -112,6 +112,9 @@ impl FromStr for ZTimelineId { fn from_str(s: &str) -> Result { let timelineid = hex::decode(s)?; + if timelineid.len() != 16 { + return Err(hex::FromHexError::InvalidStringLength); + } let mut buf: [u8; 16] = [0u8; 16]; buf.copy_from_slice(timelineid.as_slice()); Ok(ZTimelineId(buf)) diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index bf5a7f19e7..a47f7d1b87 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -1,6 +1,7 @@ import pytest import random import time +import json from contextlib import closing from multiprocessing import Process, Value @@ -197,3 +198,22 @@ def test_race_conditions(zenith_cli, pageserver, postgres, wa_factory, stop_valu stop_value.value = 1 proc.join() + +def test_state(zenith_cli, pageserver, postgres, wa_factory): + wa_factory.start_n_new(1) + + zenith_cli.run(["branch", "test_wal_acceptors_state", "empty"]) + pg = postgres.create_start('test_wal_acceptors_state', + wal_acceptors=wa_factory.get_connstrs()) + + # learn zenith timeline from compute + ztli = pg.safe_psql("show zenith.zenith_timeline")[0][0] + + pg.safe_psql("create table t(i int)") + + pg.stop().start() + pg.safe_psql("insert into t values(10)") + + wa = wa_factory.instances[0] + state = wa.safe_psql("state", options="'-c ztimelineid={}'".format(ztli), sslmode='disable')[0][0] + print(state) diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index aa1557752c..f6484d974f 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -85,13 +85,17 @@ class PgProtocol: self.port = port self.username = username or getpass.getuser() - def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None) -> str: + def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None, **kwargs) -> str: """ Build a libpq connection string for the Postgres instance. """ username = username or self.username - return f'host={self.host} port={self.port} user={username} dbname={dbname}' + connstr = f'host={self.host} port={self.port} user={username} dbname={dbname}' + for k, v in kwargs.items(): + connstr += " {}={}".format(k, v) + print("connstr is {}".format(connstr)) + return connstr # autocommit=True here by default because that's what we need most of the time def connect(self, *, autocommit=True, **kwargs: Any) -> PgConnection: @@ -458,9 +462,10 @@ def read_pid(path): return int(Path(path).read_text()) -class WalAcceptor: +class WalAcceptor(PgProtocol): """ An object representing a running wal acceptor daemon. """ def __init__(self, wa_binpath, data_dir, port, num): + super().__init__(host='127.0.0.1', port=port) self.wa_binpath = wa_binpath self.data_dir = data_dir self.port = port diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index 878ce491aa..23eb86a401 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -30,6 +30,7 @@ crc32c = "0.6.0" parse_duration = "2.1.1" walkdir = "2" serde = { version = "1.0", features = ["derive"] } +serde_json = "1" hex = "0.4.3" # FIXME: 'pageserver' is needed for ZTimelineId. Refactor diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 55aa2e6c29..7c286bc3db 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -213,8 +213,8 @@ impl ReceiveWalConn { self.peer_addr, server_info.system_id, server_info.timeline_id, ); // FIXME: also check that the system identifier matches - self.timeline.set(server_info.timeline_id)?; - self.timeline.get().load_control_file(&self.conf)?; + self.timeline + .set(&self.conf, server_info.timeline_id, true)?; let mut my_info = self.timeline.get().get_info(); diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index 711be4b9ca..83702dce77 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -1,5 +1,5 @@ //! Part of WAL acceptor pretending to be Postgres, streaming xlog to -//! pageserver/any other consumer. +//! pageserver/any other consumer and answering to some utility queries. //! use crate::replication::ReplicationConn; @@ -12,7 +12,7 @@ use std::str::FromStr; use std::sync::Arc; use zenith_utils::postgres_backend; use zenith_utils::postgres_backend::PostgresBackend; -use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor}; +use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor, JSON_OID, TEXT_OID}; /// Handler for streaming WAL from acceptor pub struct SendWalHandler { @@ -26,10 +26,14 @@ pub struct SendWalHandler { impl postgres_backend::Handler for SendWalHandler { fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> { match sm.params.get("ztimelineid") { - Some(ref ztimelineid) => { - let ztlid = ZTimelineId::from_str(ztimelineid)?; - self.timeline.set(ztlid)?; - } + Some(ref ztimelineid) => match ZTimelineId::from_str(ztimelineid) { + Ok(ztlid) => { + self.timeline.set(&self.conf, ztlid, false)?; + } + Err(e) => { + bail!("failed to parse ztimelineid: {}", e) + } + }, _ => bail!("timelineid is required"), } if let Some(app_name) = sm.params.get("application_name") { @@ -41,13 +45,18 @@ impl postgres_backend::Handler for SendWalHandler { fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> { if query_string.starts_with(b"IDENTIFY_SYSTEM") { self.handle_identify_system(pgb)?; - Ok(()) } else if query_string.starts_with(b"START_REPLICATION") { ReplicationConn::new(pgb).run(self, pgb, &query_string)?; - Ok(()) + } else if query_string.starts_with(b"state") { + self.handle_state(pgb)?; + } else if query_string.to_ascii_lowercase().starts_with(b"set ") { + // have it because psycopg2 executes "SET datestyle TO 'ISO'" + // on connect + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else { bail!("Unexpected command {:?}", query_string); } + Ok(()) } } @@ -75,7 +84,7 @@ impl SendWalHandler { pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor { name: b"systemid", - typoid: 25, + typoid: TEXT_OID, typlen: -1, ..Default::default() }, @@ -87,13 +96,13 @@ impl SendWalHandler { }, RowDescriptor { name: b"xlogpos", - typoid: 25, + typoid: TEXT_OID, typlen: -1, ..Default::default() }, RowDescriptor { name: b"dbname", - typoid: 25, + typoid: TEXT_OID, typlen: -1, ..Default::default() }, @@ -107,4 +116,21 @@ impl SendWalHandler { .write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?; Ok(()) } + + /// Send current state of safekeeper + fn handle_state(&mut self, pgb: &mut PostgresBackend) -> Result<()> { + let my_info = self.timeline.get().get_info(); + pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor { + name: b"state", + typoid: JSON_OID, + typlen: -1, + ..Default::default() + }]))? + .write_message_noflush(&BeMessage::DataRow(&[Some( + serde_json::to_string(&my_info).unwrap().as_bytes(), + )]))? + .write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?; + + Ok(()) + } } diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 70cf0d5bf0..29eb35900a 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -55,10 +55,12 @@ impl SharedState { } /// Load and lock control file (prevent running more than one instance of safekeeper) + /// If create=false and file doesn't exist, bails out. pub fn load_control_file( &mut self, conf: &WalAcceptorConf, timelineid: ZTimelineId, + create: bool, ) -> Result<()> { if self.control_file.is_some() { info!("control file for timeline {} is already open", timelineid); @@ -69,13 +71,17 @@ impl SharedState { .data_dir .join(timelineid.to_string()) .join(CONTROL_FILE_NAME); - info!("loading control file {}", control_file_path.display()); - match OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&control_file_path) - { + info!( + "loading control file {}, create={}", + control_file_path.display(), + create + ); + let mut opts = OpenOptions::new(); + opts.read(true).write(true); + if create { + opts.create(true); + } + match opts.open(&control_file_path) { Ok(file) => { // Lock file to prevent two or more active wal_acceptors match file.try_lock_exclusive() { @@ -91,29 +97,36 @@ impl SharedState { self.control_file = Some(file); let cfile_ref = self.control_file.as_mut().unwrap(); - match SafeKeeperInfo::des_from(cfile_ref) { - Err(e) => { - warn!("read from {:?} failed: {}", control_file_path, e); + if cfile_ref.metadata().unwrap().len() == 0 { + if !create { + bail!("control file is empty"); } - Ok(info) => { - if info.magic != SK_MAGIC { - bail!("Invalid control file magic: {}", info.magic); + } else { + match SafeKeeperInfo::des_from(cfile_ref) { + Err(e) => { + bail!("failed to read control file {:?}: {}", control_file_path, e); } - if info.format_version != SK_FORMAT_VERSION { - bail!( - "Incompatible format version: {} vs. {}", - info.format_version, - SK_FORMAT_VERSION - ); + Ok(info) => { + if info.magic != SK_MAGIC { + bail!("Invalid control file magic: {}", info.magic); + } + if info.format_version != SK_FORMAT_VERSION { + bail!( + "Incompatible format version: {} vs. {}", + info.format_version, + SK_FORMAT_VERSION + ); + } + self.info = info; } - self.info = info; } } } Err(e) => { - panic!( + bail!( "Failed to open control file {:?}: {}", - &control_file_path, e + &control_file_path, + e ); } } @@ -198,9 +211,9 @@ impl Timeline { shared_state.hs_feedback.clone() } - pub fn load_control_file(&self, conf: &WalAcceptorConf) -> Result<()> { + pub fn load_control_file(&self, conf: &WalAcceptorConf, create: bool) -> Result<()> { let mut shared_state = self.mutex.lock().unwrap(); - shared_state.load_control_file(conf, self.timelineid) + shared_state.load_control_file(conf, self.timelineid, create) } pub fn save_control_file(&self, sync: bool) -> Result<()> { @@ -211,17 +224,23 @@ impl Timeline { // Utilities needed by various Connection-like objects pub trait TimelineTools { - fn set(&mut self, timeline_id: ZTimelineId) -> Result<()>; + fn set(&mut self, conf: &WalAcceptorConf, timeline_id: ZTimelineId, create: bool) + -> Result<()>; fn get(&self) -> &Arc; fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID); } impl TimelineTools for Option> { - fn set(&mut self, timeline_id: ZTimelineId) -> Result<()> { + fn set( + &mut self, + conf: &WalAcceptorConf, + timeline_id: ZTimelineId, + create: bool, + ) -> Result<()> { // We will only set the timeline once. If it were to ever change, // anyone who cloned the Arc would be out of date. assert!(self.is_none()); - *self = Some(GlobalTimelines::store(timeline_id)?); + *self = Some(GlobalTimelines::get(conf, timeline_id, create)?); Ok(()) } @@ -243,11 +262,16 @@ lazy_static! { } /// A zero-sized struct used to manage access to the global timelines map. -struct GlobalTimelines; +pub struct GlobalTimelines; impl GlobalTimelines { - /// Store a new timeline into the global TIMELINES map. - fn store(timeline_id: ZTimelineId) -> Result> { + /// Get a timeline with control file loaded from the global TIMELINES map. + /// If control file doesn't exist and create=false, bails out. + pub fn get( + conf: &WalAcceptorConf, + timeline_id: ZTimelineId, + create: bool, + ) -> Result> { let mut timelines = TIMELINES.lock().unwrap(); match timelines.get(&timeline_id) { @@ -258,9 +282,10 @@ impl GlobalTimelines { let shared_state = SharedState::new(); - let new_tid = Arc::new(Timeline::new(timeline_id, shared_state)); - timelines.insert(timeline_id, Arc::clone(&new_tid)); - Ok(new_tid) + let new_tli = Arc::new(Timeline::new(timeline_id, shared_state)); + new_tli.load_control_file(conf, create)?; + timelines.insert(timeline_id, Arc::clone(&new_tli)); + Ok(new_tli) } } } diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index 78993af761..ee9c330563 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -116,7 +116,12 @@ impl PostgresBackend { Some(FeMessage::StartupMessage(m)) => { trace!("got startup message {:?}", m); - handler.startup(self, &m)?; + if let Err(e) = handler.startup(self, &m) { + // try to send error to the client + let errmsg = format!("{}", e); + self.write_message(&BeMessage::ErrorResponse(errmsg))?; + return Err(e); + } match m.kind { StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => { diff --git a/zenith_utils/src/pq_proto.rs b/zenith_utils/src/pq_proto.rs index 77d73d50da..168182ac54 100644 --- a/zenith_utils/src/pq_proto.rs +++ b/zenith_utils/src/pq_proto.rs @@ -366,6 +366,7 @@ pub struct XLogDataBody<'a> { pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]); pub const TEXT_OID: Oid = 25; +pub const JSON_OID: Oid = 114; // single text column pub static SINGLE_COL_ROWDESC: BeMessage = BeMessage::RowDescription(&[RowDescriptor { name: b"data",