Compare commits

...

1 Commits

Author SHA1 Message Date
Arseny Sher
18d625e556 wip
todo: add s3 offloading progress, test epoch once its update will be fixed.

ref #115
2021-06-22 18:24:27 +03:00
10 changed files with 137 additions and 50 deletions

1
Cargo.lock generated
View File

@@ -2254,6 +2254,7 @@ dependencies = [
"regex",
"rust-s3",
"serde",
"serde_json",
"slog",
"slog-async",
"slog-scope",

View File

@@ -112,6 +112,9 @@ impl FromStr for ZTimelineId {
fn from_str(s: &str) -> Result<ZTimelineId, Self::Err> {
let timelineid = hex::decode(s)?;
if timelineid.len() != 16 {
return Err(hex::FromHexError::InvalidStringLength);
}
let mut buf: [u8; 16] = [0u8; 16];
buf.copy_from_slice(timelineid.as_slice());
Ok(ZTimelineId(buf))

View File

@@ -1,6 +1,7 @@
import pytest
import random
import time
import json
from contextlib import closing
from multiprocessing import Process, Value
@@ -197,3 +198,22 @@ def test_race_conditions(zenith_cli, pageserver, postgres, wa_factory, stop_valu
stop_value.value = 1
proc.join()
def test_state(zenith_cli, pageserver, postgres, wa_factory):
wa_factory.start_n_new(1)
zenith_cli.run(["branch", "test_wal_acceptors_state", "empty"])
pg = postgres.create_start('test_wal_acceptors_state',
wal_acceptors=wa_factory.get_connstrs())
# learn zenith timeline from compute
ztli = pg.safe_psql("show zenith.zenith_timeline")[0][0]
pg.safe_psql("create table t(i int)")
pg.stop().start()
pg.safe_psql("insert into t values(10)")
wa = wa_factory.instances[0]
state = wa.safe_psql("state", options="'-c ztimelineid={}'".format(ztli), sslmode='disable')[0][0]
print(state)

View File

@@ -85,13 +85,17 @@ class PgProtocol:
self.port = port
self.username = username or getpass.getuser()
def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None) -> str:
def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None, **kwargs) -> str:
"""
Build a libpq connection string for the Postgres instance.
"""
username = username or self.username
return f'host={self.host} port={self.port} user={username} dbname={dbname}'
connstr = f'host={self.host} port={self.port} user={username} dbname={dbname}'
for k, v in kwargs.items():
connstr += " {}={}".format(k, v)
print("connstr is {}".format(connstr))
return connstr
# autocommit=True here by default because that's what we need most of the time
def connect(self, *, autocommit=True, **kwargs: Any) -> PgConnection:
@@ -458,9 +462,10 @@ def read_pid(path):
return int(Path(path).read_text())
class WalAcceptor:
class WalAcceptor(PgProtocol):
""" An object representing a running wal acceptor daemon. """
def __init__(self, wa_binpath, data_dir, port, num):
super().__init__(host='127.0.0.1', port=port)
self.wa_binpath = wa_binpath
self.data_dir = data_dir
self.port = port

View File

@@ -30,6 +30,7 @@ crc32c = "0.6.0"
parse_duration = "2.1.1"
walkdir = "2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
hex = "0.4.3"
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor

View File

@@ -213,8 +213,8 @@ impl ReceiveWalConn {
self.peer_addr, server_info.system_id, server_info.timeline_id,
);
// FIXME: also check that the system identifier matches
self.timeline.set(server_info.timeline_id)?;
self.timeline.get().load_control_file(&self.conf)?;
self.timeline
.set(&self.conf, server_info.timeline_id, true)?;
let mut my_info = self.timeline.get().get_info();

View File

@@ -1,5 +1,5 @@
//! Part of WAL acceptor pretending to be Postgres, streaming xlog to
//! pageserver/any other consumer.
//! pageserver/any other consumer and answering to some utility queries.
//!
use crate::replication::ReplicationConn;
@@ -12,7 +12,7 @@ use std::str::FromStr;
use std::sync::Arc;
use zenith_utils::postgres_backend;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor};
use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor, JSON_OID, TEXT_OID};
/// Handler for streaming WAL from acceptor
pub struct SendWalHandler {
@@ -26,10 +26,14 @@ pub struct SendWalHandler {
impl postgres_backend::Handler for SendWalHandler {
fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> {
match sm.params.get("ztimelineid") {
Some(ref ztimelineid) => {
let ztlid = ZTimelineId::from_str(ztimelineid)?;
self.timeline.set(ztlid)?;
}
Some(ref ztimelineid) => match ZTimelineId::from_str(ztimelineid) {
Ok(ztlid) => {
self.timeline.set(&self.conf, ztlid, false)?;
}
Err(e) => {
bail!("failed to parse ztimelineid: {}", e)
}
},
_ => bail!("timelineid is required"),
}
if let Some(app_name) = sm.params.get("application_name") {
@@ -41,13 +45,18 @@ impl postgres_backend::Handler for SendWalHandler {
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> {
if query_string.starts_with(b"IDENTIFY_SYSTEM") {
self.handle_identify_system(pgb)?;
Ok(())
} else if query_string.starts_with(b"START_REPLICATION") {
ReplicationConn::new(pgb).run(self, pgb, &query_string)?;
Ok(())
} else if query_string.starts_with(b"state") {
self.handle_state(pgb)?;
} else if query_string.to_ascii_lowercase().starts_with(b"set ") {
// have it because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else {
bail!("Unexpected command {:?}", query_string);
}
Ok(())
}
}
@@ -75,7 +84,7 @@ impl SendWalHandler {
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor {
name: b"systemid",
typoid: 25,
typoid: TEXT_OID,
typlen: -1,
..Default::default()
},
@@ -87,13 +96,13 @@ impl SendWalHandler {
},
RowDescriptor {
name: b"xlogpos",
typoid: 25,
typoid: TEXT_OID,
typlen: -1,
..Default::default()
},
RowDescriptor {
name: b"dbname",
typoid: 25,
typoid: TEXT_OID,
typlen: -1,
..Default::default()
},
@@ -107,4 +116,21 @@ impl SendWalHandler {
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
Ok(())
}
/// Send current state of safekeeper
fn handle_state(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
let my_info = self.timeline.get().get_info();
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
name: b"state",
typoid: JSON_OID,
typlen: -1,
..Default::default()
}]))?
.write_message_noflush(&BeMessage::DataRow(&[Some(
serde_json::to_string(&my_info).unwrap().as_bytes(),
)]))?
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
Ok(())
}
}

View File

@@ -55,10 +55,12 @@ impl SharedState {
}
/// Load and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out.
pub fn load_control_file(
&mut self,
conf: &WalAcceptorConf,
timelineid: ZTimelineId,
create: bool,
) -> Result<()> {
if self.control_file.is_some() {
info!("control file for timeline {} is already open", timelineid);
@@ -69,13 +71,17 @@ impl SharedState {
.data_dir
.join(timelineid.to_string())
.join(CONTROL_FILE_NAME);
info!("loading control file {}", control_file_path.display());
match OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&control_file_path)
{
info!(
"loading control file {}, create={}",
control_file_path.display(),
create
);
let mut opts = OpenOptions::new();
opts.read(true).write(true);
if create {
opts.create(true);
}
match opts.open(&control_file_path) {
Ok(file) => {
// Lock file to prevent two or more active wal_acceptors
match file.try_lock_exclusive() {
@@ -91,29 +97,36 @@ impl SharedState {
self.control_file = Some(file);
let cfile_ref = self.control_file.as_mut().unwrap();
match SafeKeeperInfo::des_from(cfile_ref) {
Err(e) => {
warn!("read from {:?} failed: {}", control_file_path, e);
if cfile_ref.metadata().unwrap().len() == 0 {
if !create {
bail!("control file is empty");
}
Ok(info) => {
if info.magic != SK_MAGIC {
bail!("Invalid control file magic: {}", info.magic);
} else {
match SafeKeeperInfo::des_from(cfile_ref) {
Err(e) => {
bail!("failed to read control file {:?}: {}", control_file_path, e);
}
if info.format_version != SK_FORMAT_VERSION {
bail!(
"Incompatible format version: {} vs. {}",
info.format_version,
SK_FORMAT_VERSION
);
Ok(info) => {
if info.magic != SK_MAGIC {
bail!("Invalid control file magic: {}", info.magic);
}
if info.format_version != SK_FORMAT_VERSION {
bail!(
"Incompatible format version: {} vs. {}",
info.format_version,
SK_FORMAT_VERSION
);
}
self.info = info;
}
self.info = info;
}
}
}
Err(e) => {
panic!(
bail!(
"Failed to open control file {:?}: {}",
&control_file_path, e
&control_file_path,
e
);
}
}
@@ -198,9 +211,9 @@ impl Timeline {
shared_state.hs_feedback.clone()
}
pub fn load_control_file(&self, conf: &WalAcceptorConf) -> Result<()> {
pub fn load_control_file(&self, conf: &WalAcceptorConf, create: bool) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.load_control_file(conf, self.timelineid)
shared_state.load_control_file(conf, self.timelineid, create)
}
pub fn save_control_file(&self, sync: bool) -> Result<()> {
@@ -211,17 +224,23 @@ impl Timeline {
// Utilities needed by various Connection-like objects
pub trait TimelineTools {
fn set(&mut self, timeline_id: ZTimelineId) -> Result<()>;
fn set(&mut self, conf: &WalAcceptorConf, timeline_id: ZTimelineId, create: bool)
-> Result<()>;
fn get(&self) -> &Arc<Timeline>;
fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID);
}
impl TimelineTools for Option<Arc<Timeline>> {
fn set(&mut self, timeline_id: ZTimelineId) -> Result<()> {
fn set(
&mut self,
conf: &WalAcceptorConf,
timeline_id: ZTimelineId,
create: bool,
) -> Result<()> {
// We will only set the timeline once. If it were to ever change,
// anyone who cloned the Arc would be out of date.
assert!(self.is_none());
*self = Some(GlobalTimelines::store(timeline_id)?);
*self = Some(GlobalTimelines::get(conf, timeline_id, create)?);
Ok(())
}
@@ -243,11 +262,16 @@ lazy_static! {
}
/// A zero-sized struct used to manage access to the global timelines map.
struct GlobalTimelines;
pub struct GlobalTimelines;
impl GlobalTimelines {
/// Store a new timeline into the global TIMELINES map.
fn store(timeline_id: ZTimelineId) -> Result<Arc<Timeline>> {
/// Get a timeline with control file loaded from the global TIMELINES map.
/// If control file doesn't exist and create=false, bails out.
pub fn get(
conf: &WalAcceptorConf,
timeline_id: ZTimelineId,
create: bool,
) -> Result<Arc<Timeline>> {
let mut timelines = TIMELINES.lock().unwrap();
match timelines.get(&timeline_id) {
@@ -258,9 +282,10 @@ impl GlobalTimelines {
let shared_state = SharedState::new();
let new_tid = Arc::new(Timeline::new(timeline_id, shared_state));
timelines.insert(timeline_id, Arc::clone(&new_tid));
Ok(new_tid)
let new_tli = Arc::new(Timeline::new(timeline_id, shared_state));
new_tli.load_control_file(conf, create)?;
timelines.insert(timeline_id, Arc::clone(&new_tli));
Ok(new_tli)
}
}
}

View File

@@ -116,7 +116,12 @@ impl PostgresBackend {
Some(FeMessage::StartupMessage(m)) => {
trace!("got startup message {:?}", m);
handler.startup(self, &m)?;
if let Err(e) = handler.startup(self, &m) {
// try to send error to the client
let errmsg = format!("{}", e);
self.write_message(&BeMessage::ErrorResponse(errmsg))?;
return Err(e);
}
match m.kind {
StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => {

View File

@@ -366,6 +366,7 @@ pub struct XLogDataBody<'a> {
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
pub const TEXT_OID: Oid = 25;
pub const JSON_OID: Oid = 114;
// single text column
pub static SINGLE_COL_ROWDESC: BeMessage = BeMessage::RowDescription(&[RowDescriptor {
name: b"data",