Compare commits

...

1 Commits

Author SHA1 Message Date
Arseny Sher
18d625e556 wip
todo: add s3 offloading progress, test epoch once its update will be fixed.

ref #115
2021-06-22 18:24:27 +03:00
10 changed files with 137 additions and 50 deletions

1
Cargo.lock generated
View File

@@ -2254,6 +2254,7 @@ dependencies = [
"regex", "regex",
"rust-s3", "rust-s3",
"serde", "serde",
"serde_json",
"slog", "slog",
"slog-async", "slog-async",
"slog-scope", "slog-scope",

View File

@@ -112,6 +112,9 @@ impl FromStr for ZTimelineId {
fn from_str(s: &str) -> Result<ZTimelineId, Self::Err> { fn from_str(s: &str) -> Result<ZTimelineId, Self::Err> {
let timelineid = hex::decode(s)?; let timelineid = hex::decode(s)?;
if timelineid.len() != 16 {
return Err(hex::FromHexError::InvalidStringLength);
}
let mut buf: [u8; 16] = [0u8; 16]; let mut buf: [u8; 16] = [0u8; 16];
buf.copy_from_slice(timelineid.as_slice()); buf.copy_from_slice(timelineid.as_slice());
Ok(ZTimelineId(buf)) Ok(ZTimelineId(buf))

View File

@@ -1,6 +1,7 @@
import pytest import pytest
import random import random
import time import time
import json
from contextlib import closing from contextlib import closing
from multiprocessing import Process, Value from multiprocessing import Process, Value
@@ -197,3 +198,22 @@ def test_race_conditions(zenith_cli, pageserver, postgres, wa_factory, stop_valu
stop_value.value = 1 stop_value.value = 1
proc.join() proc.join()
def test_state(zenith_cli, pageserver, postgres, wa_factory):
wa_factory.start_n_new(1)
zenith_cli.run(["branch", "test_wal_acceptors_state", "empty"])
pg = postgres.create_start('test_wal_acceptors_state',
wal_acceptors=wa_factory.get_connstrs())
# learn zenith timeline from compute
ztli = pg.safe_psql("show zenith.zenith_timeline")[0][0]
pg.safe_psql("create table t(i int)")
pg.stop().start()
pg.safe_psql("insert into t values(10)")
wa = wa_factory.instances[0]
state = wa.safe_psql("state", options="'-c ztimelineid={}'".format(ztli), sslmode='disable')[0][0]
print(state)

View File

@@ -85,13 +85,17 @@ class PgProtocol:
self.port = port self.port = port
self.username = username or getpass.getuser() self.username = username or getpass.getuser()
def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None) -> str: def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None, **kwargs) -> str:
""" """
Build a libpq connection string for the Postgres instance. Build a libpq connection string for the Postgres instance.
""" """
username = username or self.username username = username or self.username
return f'host={self.host} port={self.port} user={username} dbname={dbname}' connstr = f'host={self.host} port={self.port} user={username} dbname={dbname}'
for k, v in kwargs.items():
connstr += " {}={}".format(k, v)
print("connstr is {}".format(connstr))
return connstr
# autocommit=True here by default because that's what we need most of the time # autocommit=True here by default because that's what we need most of the time
def connect(self, *, autocommit=True, **kwargs: Any) -> PgConnection: def connect(self, *, autocommit=True, **kwargs: Any) -> PgConnection:
@@ -458,9 +462,10 @@ def read_pid(path):
return int(Path(path).read_text()) return int(Path(path).read_text())
class WalAcceptor: class WalAcceptor(PgProtocol):
""" An object representing a running wal acceptor daemon. """ """ An object representing a running wal acceptor daemon. """
def __init__(self, wa_binpath, data_dir, port, num): def __init__(self, wa_binpath, data_dir, port, num):
super().__init__(host='127.0.0.1', port=port)
self.wa_binpath = wa_binpath self.wa_binpath = wa_binpath
self.data_dir = data_dir self.data_dir = data_dir
self.port = port self.port = port

View File

@@ -30,6 +30,7 @@ crc32c = "0.6.0"
parse_duration = "2.1.1" parse_duration = "2.1.1"
walkdir = "2" walkdir = "2"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
hex = "0.4.3" hex = "0.4.3"
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor # FIXME: 'pageserver' is needed for ZTimelineId. Refactor

View File

@@ -213,8 +213,8 @@ impl ReceiveWalConn {
self.peer_addr, server_info.system_id, server_info.timeline_id, self.peer_addr, server_info.system_id, server_info.timeline_id,
); );
// FIXME: also check that the system identifier matches // FIXME: also check that the system identifier matches
self.timeline.set(server_info.timeline_id)?; self.timeline
self.timeline.get().load_control_file(&self.conf)?; .set(&self.conf, server_info.timeline_id, true)?;
let mut my_info = self.timeline.get().get_info(); let mut my_info = self.timeline.get().get_info();

View File

@@ -1,5 +1,5 @@
//! Part of WAL acceptor pretending to be Postgres, streaming xlog to //! Part of WAL acceptor pretending to be Postgres, streaming xlog to
//! pageserver/any other consumer. //! pageserver/any other consumer and answering to some utility queries.
//! //!
use crate::replication::ReplicationConn; use crate::replication::ReplicationConn;
@@ -12,7 +12,7 @@ use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use zenith_utils::postgres_backend; use zenith_utils::postgres_backend;
use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor}; use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor, JSON_OID, TEXT_OID};
/// Handler for streaming WAL from acceptor /// Handler for streaming WAL from acceptor
pub struct SendWalHandler { pub struct SendWalHandler {
@@ -26,10 +26,14 @@ pub struct SendWalHandler {
impl postgres_backend::Handler for SendWalHandler { impl postgres_backend::Handler for SendWalHandler {
fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> { fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> {
match sm.params.get("ztimelineid") { match sm.params.get("ztimelineid") {
Some(ref ztimelineid) => { Some(ref ztimelineid) => match ZTimelineId::from_str(ztimelineid) {
let ztlid = ZTimelineId::from_str(ztimelineid)?; Ok(ztlid) => {
self.timeline.set(ztlid)?; self.timeline.set(&self.conf, ztlid, false)?;
} }
Err(e) => {
bail!("failed to parse ztimelineid: {}", e)
}
},
_ => bail!("timelineid is required"), _ => bail!("timelineid is required"),
} }
if let Some(app_name) = sm.params.get("application_name") { if let Some(app_name) = sm.params.get("application_name") {
@@ -41,13 +45,18 @@ impl postgres_backend::Handler for SendWalHandler {
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> { fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> {
if query_string.starts_with(b"IDENTIFY_SYSTEM") { if query_string.starts_with(b"IDENTIFY_SYSTEM") {
self.handle_identify_system(pgb)?; self.handle_identify_system(pgb)?;
Ok(())
} else if query_string.starts_with(b"START_REPLICATION") { } else if query_string.starts_with(b"START_REPLICATION") {
ReplicationConn::new(pgb).run(self, pgb, &query_string)?; ReplicationConn::new(pgb).run(self, pgb, &query_string)?;
Ok(()) } else if query_string.starts_with(b"state") {
self.handle_state(pgb)?;
} else if query_string.to_ascii_lowercase().starts_with(b"set ") {
// have it because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else { } else {
bail!("Unexpected command {:?}", query_string); bail!("Unexpected command {:?}", query_string);
} }
Ok(())
} }
} }
@@ -75,7 +84,7 @@ impl SendWalHandler {
pgb.write_message_noflush(&BeMessage::RowDescription(&[ pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor { RowDescriptor {
name: b"systemid", name: b"systemid",
typoid: 25, typoid: TEXT_OID,
typlen: -1, typlen: -1,
..Default::default() ..Default::default()
}, },
@@ -87,13 +96,13 @@ impl SendWalHandler {
}, },
RowDescriptor { RowDescriptor {
name: b"xlogpos", name: b"xlogpos",
typoid: 25, typoid: TEXT_OID,
typlen: -1, typlen: -1,
..Default::default() ..Default::default()
}, },
RowDescriptor { RowDescriptor {
name: b"dbname", name: b"dbname",
typoid: 25, typoid: TEXT_OID,
typlen: -1, typlen: -1,
..Default::default() ..Default::default()
}, },
@@ -107,4 +116,21 @@ impl SendWalHandler {
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?; .write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
Ok(()) Ok(())
} }
/// Send current state of safekeeper
fn handle_state(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
let my_info = self.timeline.get().get_info();
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
name: b"state",
typoid: JSON_OID,
typlen: -1,
..Default::default()
}]))?
.write_message_noflush(&BeMessage::DataRow(&[Some(
serde_json::to_string(&my_info).unwrap().as_bytes(),
)]))?
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
Ok(())
}
} }

View File

@@ -55,10 +55,12 @@ impl SharedState {
} }
/// Load and lock control file (prevent running more than one instance of safekeeper) /// Load and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out.
pub fn load_control_file( pub fn load_control_file(
&mut self, &mut self,
conf: &WalAcceptorConf, conf: &WalAcceptorConf,
timelineid: ZTimelineId, timelineid: ZTimelineId,
create: bool,
) -> Result<()> { ) -> Result<()> {
if self.control_file.is_some() { if self.control_file.is_some() {
info!("control file for timeline {} is already open", timelineid); info!("control file for timeline {} is already open", timelineid);
@@ -69,13 +71,17 @@ impl SharedState {
.data_dir .data_dir
.join(timelineid.to_string()) .join(timelineid.to_string())
.join(CONTROL_FILE_NAME); .join(CONTROL_FILE_NAME);
info!("loading control file {}", control_file_path.display()); info!(
match OpenOptions::new() "loading control file {}, create={}",
.read(true) control_file_path.display(),
.write(true) create
.create(true) );
.open(&control_file_path) let mut opts = OpenOptions::new();
{ opts.read(true).write(true);
if create {
opts.create(true);
}
match opts.open(&control_file_path) {
Ok(file) => { Ok(file) => {
// Lock file to prevent two or more active wal_acceptors // Lock file to prevent two or more active wal_acceptors
match file.try_lock_exclusive() { match file.try_lock_exclusive() {
@@ -91,29 +97,36 @@ impl SharedState {
self.control_file = Some(file); self.control_file = Some(file);
let cfile_ref = self.control_file.as_mut().unwrap(); let cfile_ref = self.control_file.as_mut().unwrap();
match SafeKeeperInfo::des_from(cfile_ref) { if cfile_ref.metadata().unwrap().len() == 0 {
Err(e) => { if !create {
warn!("read from {:?} failed: {}", control_file_path, e); bail!("control file is empty");
} }
Ok(info) => { } else {
if info.magic != SK_MAGIC { match SafeKeeperInfo::des_from(cfile_ref) {
bail!("Invalid control file magic: {}", info.magic); Err(e) => {
bail!("failed to read control file {:?}: {}", control_file_path, e);
} }
if info.format_version != SK_FORMAT_VERSION { Ok(info) => {
bail!( if info.magic != SK_MAGIC {
"Incompatible format version: {} vs. {}", bail!("Invalid control file magic: {}", info.magic);
info.format_version, }
SK_FORMAT_VERSION if info.format_version != SK_FORMAT_VERSION {
); bail!(
"Incompatible format version: {} vs. {}",
info.format_version,
SK_FORMAT_VERSION
);
}
self.info = info;
} }
self.info = info;
} }
} }
} }
Err(e) => { Err(e) => {
panic!( bail!(
"Failed to open control file {:?}: {}", "Failed to open control file {:?}: {}",
&control_file_path, e &control_file_path,
e
); );
} }
} }
@@ -198,9 +211,9 @@ impl Timeline {
shared_state.hs_feedback.clone() shared_state.hs_feedback.clone()
} }
pub fn load_control_file(&self, conf: &WalAcceptorConf) -> Result<()> { pub fn load_control_file(&self, conf: &WalAcceptorConf, create: bool) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap(); let mut shared_state = self.mutex.lock().unwrap();
shared_state.load_control_file(conf, self.timelineid) shared_state.load_control_file(conf, self.timelineid, create)
} }
pub fn save_control_file(&self, sync: bool) -> Result<()> { pub fn save_control_file(&self, sync: bool) -> Result<()> {
@@ -211,17 +224,23 @@ impl Timeline {
// Utilities needed by various Connection-like objects // Utilities needed by various Connection-like objects
pub trait TimelineTools { pub trait TimelineTools {
fn set(&mut self, timeline_id: ZTimelineId) -> Result<()>; fn set(&mut self, conf: &WalAcceptorConf, timeline_id: ZTimelineId, create: bool)
-> Result<()>;
fn get(&self) -> &Arc<Timeline>; fn get(&self) -> &Arc<Timeline>;
fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID); fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID);
} }
impl TimelineTools for Option<Arc<Timeline>> { impl TimelineTools for Option<Arc<Timeline>> {
fn set(&mut self, timeline_id: ZTimelineId) -> Result<()> { fn set(
&mut self,
conf: &WalAcceptorConf,
timeline_id: ZTimelineId,
create: bool,
) -> Result<()> {
// We will only set the timeline once. If it were to ever change, // We will only set the timeline once. If it were to ever change,
// anyone who cloned the Arc would be out of date. // anyone who cloned the Arc would be out of date.
assert!(self.is_none()); assert!(self.is_none());
*self = Some(GlobalTimelines::store(timeline_id)?); *self = Some(GlobalTimelines::get(conf, timeline_id, create)?);
Ok(()) Ok(())
} }
@@ -243,11 +262,16 @@ lazy_static! {
} }
/// A zero-sized struct used to manage access to the global timelines map. /// A zero-sized struct used to manage access to the global timelines map.
struct GlobalTimelines; pub struct GlobalTimelines;
impl GlobalTimelines { impl GlobalTimelines {
/// Store a new timeline into the global TIMELINES map. /// Get a timeline with control file loaded from the global TIMELINES map.
fn store(timeline_id: ZTimelineId) -> Result<Arc<Timeline>> { /// If control file doesn't exist and create=false, bails out.
pub fn get(
conf: &WalAcceptorConf,
timeline_id: ZTimelineId,
create: bool,
) -> Result<Arc<Timeline>> {
let mut timelines = TIMELINES.lock().unwrap(); let mut timelines = TIMELINES.lock().unwrap();
match timelines.get(&timeline_id) { match timelines.get(&timeline_id) {
@@ -258,9 +282,10 @@ impl GlobalTimelines {
let shared_state = SharedState::new(); let shared_state = SharedState::new();
let new_tid = Arc::new(Timeline::new(timeline_id, shared_state)); let new_tli = Arc::new(Timeline::new(timeline_id, shared_state));
timelines.insert(timeline_id, Arc::clone(&new_tid)); new_tli.load_control_file(conf, create)?;
Ok(new_tid) timelines.insert(timeline_id, Arc::clone(&new_tli));
Ok(new_tli)
} }
} }
} }

View File

@@ -116,7 +116,12 @@ impl PostgresBackend {
Some(FeMessage::StartupMessage(m)) => { Some(FeMessage::StartupMessage(m)) => {
trace!("got startup message {:?}", m); trace!("got startup message {:?}", m);
handler.startup(self, &m)?; if let Err(e) = handler.startup(self, &m) {
// try to send error to the client
let errmsg = format!("{}", e);
self.write_message(&BeMessage::ErrorResponse(errmsg))?;
return Err(e);
}
match m.kind { match m.kind {
StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => { StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => {

View File

@@ -366,6 +366,7 @@ pub struct XLogDataBody<'a> {
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]); pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
pub const TEXT_OID: Oid = 25; pub const TEXT_OID: Oid = 25;
pub const JSON_OID: Oid = 114;
// single text column // single text column
pub static SINGLE_COL_ROWDESC: BeMessage = BeMessage::RowDescription(&[RowDescriptor { pub static SINGLE_COL_ROWDESC: BeMessage = BeMessage::RowDescription(&[RowDescriptor {
name: b"data", name: b"data",