mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-21 04:12:55 +00:00
Compare commits
1 Commits
fix_list_p
...
wal_accept
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
18d625e556 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2254,6 +2254,7 @@ dependencies = [
|
||||
"regex",
|
||||
"rust-s3",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"slog",
|
||||
"slog-async",
|
||||
"slog-scope",
|
||||
|
||||
@@ -112,6 +112,9 @@ impl FromStr for ZTimelineId {
|
||||
fn from_str(s: &str) -> Result<ZTimelineId, Self::Err> {
|
||||
let timelineid = hex::decode(s)?;
|
||||
|
||||
if timelineid.len() != 16 {
|
||||
return Err(hex::FromHexError::InvalidStringLength);
|
||||
}
|
||||
let mut buf: [u8; 16] = [0u8; 16];
|
||||
buf.copy_from_slice(timelineid.as_slice());
|
||||
Ok(ZTimelineId(buf))
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
import random
|
||||
import time
|
||||
import json
|
||||
|
||||
from contextlib import closing
|
||||
from multiprocessing import Process, Value
|
||||
@@ -197,3 +198,22 @@ def test_race_conditions(zenith_cli, pageserver, postgres, wa_factory, stop_valu
|
||||
|
||||
stop_value.value = 1
|
||||
proc.join()
|
||||
|
||||
def test_state(zenith_cli, pageserver, postgres, wa_factory):
|
||||
wa_factory.start_n_new(1)
|
||||
|
||||
zenith_cli.run(["branch", "test_wal_acceptors_state", "empty"])
|
||||
pg = postgres.create_start('test_wal_acceptors_state',
|
||||
wal_acceptors=wa_factory.get_connstrs())
|
||||
|
||||
# learn zenith timeline from compute
|
||||
ztli = pg.safe_psql("show zenith.zenith_timeline")[0][0]
|
||||
|
||||
pg.safe_psql("create table t(i int)")
|
||||
|
||||
pg.stop().start()
|
||||
pg.safe_psql("insert into t values(10)")
|
||||
|
||||
wa = wa_factory.instances[0]
|
||||
state = wa.safe_psql("state", options="'-c ztimelineid={}'".format(ztli), sslmode='disable')[0][0]
|
||||
print(state)
|
||||
|
||||
@@ -85,13 +85,17 @@ class PgProtocol:
|
||||
self.port = port
|
||||
self.username = username or getpass.getuser()
|
||||
|
||||
def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None) -> str:
|
||||
def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None, **kwargs) -> str:
|
||||
"""
|
||||
Build a libpq connection string for the Postgres instance.
|
||||
"""
|
||||
|
||||
username = username or self.username
|
||||
return f'host={self.host} port={self.port} user={username} dbname={dbname}'
|
||||
connstr = f'host={self.host} port={self.port} user={username} dbname={dbname}'
|
||||
for k, v in kwargs.items():
|
||||
connstr += " {}={}".format(k, v)
|
||||
print("connstr is {}".format(connstr))
|
||||
return connstr
|
||||
|
||||
# autocommit=True here by default because that's what we need most of the time
|
||||
def connect(self, *, autocommit=True, **kwargs: Any) -> PgConnection:
|
||||
@@ -458,9 +462,10 @@ def read_pid(path):
|
||||
return int(Path(path).read_text())
|
||||
|
||||
|
||||
class WalAcceptor:
|
||||
class WalAcceptor(PgProtocol):
|
||||
""" An object representing a running wal acceptor daemon. """
|
||||
def __init__(self, wa_binpath, data_dir, port, num):
|
||||
super().__init__(host='127.0.0.1', port=port)
|
||||
self.wa_binpath = wa_binpath
|
||||
self.data_dir = data_dir
|
||||
self.port = port
|
||||
|
||||
@@ -30,6 +30,7 @@ crc32c = "0.6.0"
|
||||
parse_duration = "2.1.1"
|
||||
walkdir = "2"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
hex = "0.4.3"
|
||||
|
||||
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
|
||||
|
||||
@@ -213,8 +213,8 @@ impl ReceiveWalConn {
|
||||
self.peer_addr, server_info.system_id, server_info.timeline_id,
|
||||
);
|
||||
// FIXME: also check that the system identifier matches
|
||||
self.timeline.set(server_info.timeline_id)?;
|
||||
self.timeline.get().load_control_file(&self.conf)?;
|
||||
self.timeline
|
||||
.set(&self.conf, server_info.timeline_id, true)?;
|
||||
|
||||
let mut my_info = self.timeline.get().get_info();
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
//! Part of WAL acceptor pretending to be Postgres, streaming xlog to
|
||||
//! pageserver/any other consumer.
|
||||
//! pageserver/any other consumer and answering to some utility queries.
|
||||
//!
|
||||
|
||||
use crate::replication::ReplicationConn;
|
||||
@@ -12,7 +12,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use zenith_utils::postgres_backend;
|
||||
use zenith_utils::postgres_backend::PostgresBackend;
|
||||
use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor};
|
||||
use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor, JSON_OID, TEXT_OID};
|
||||
|
||||
/// Handler for streaming WAL from acceptor
|
||||
pub struct SendWalHandler {
|
||||
@@ -26,10 +26,14 @@ pub struct SendWalHandler {
|
||||
impl postgres_backend::Handler for SendWalHandler {
|
||||
fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> {
|
||||
match sm.params.get("ztimelineid") {
|
||||
Some(ref ztimelineid) => {
|
||||
let ztlid = ZTimelineId::from_str(ztimelineid)?;
|
||||
self.timeline.set(ztlid)?;
|
||||
}
|
||||
Some(ref ztimelineid) => match ZTimelineId::from_str(ztimelineid) {
|
||||
Ok(ztlid) => {
|
||||
self.timeline.set(&self.conf, ztlid, false)?;
|
||||
}
|
||||
Err(e) => {
|
||||
bail!("failed to parse ztimelineid: {}", e)
|
||||
}
|
||||
},
|
||||
_ => bail!("timelineid is required"),
|
||||
}
|
||||
if let Some(app_name) = sm.params.get("application_name") {
|
||||
@@ -41,13 +45,18 @@ impl postgres_backend::Handler for SendWalHandler {
|
||||
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> {
|
||||
if query_string.starts_with(b"IDENTIFY_SYSTEM") {
|
||||
self.handle_identify_system(pgb)?;
|
||||
Ok(())
|
||||
} else if query_string.starts_with(b"START_REPLICATION") {
|
||||
ReplicationConn::new(pgb).run(self, pgb, &query_string)?;
|
||||
Ok(())
|
||||
} else if query_string.starts_with(b"state") {
|
||||
self.handle_state(pgb)?;
|
||||
} else if query_string.to_ascii_lowercase().starts_with(b"set ") {
|
||||
// have it because psycopg2 executes "SET datestyle TO 'ISO'"
|
||||
// on connect
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else {
|
||||
bail!("Unexpected command {:?}", query_string);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,7 +84,7 @@ impl SendWalHandler {
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor {
|
||||
name: b"systemid",
|
||||
typoid: 25,
|
||||
typoid: TEXT_OID,
|
||||
typlen: -1,
|
||||
..Default::default()
|
||||
},
|
||||
@@ -87,13 +96,13 @@ impl SendWalHandler {
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"xlogpos",
|
||||
typoid: 25,
|
||||
typoid: TEXT_OID,
|
||||
typlen: -1,
|
||||
..Default::default()
|
||||
},
|
||||
RowDescriptor {
|
||||
name: b"dbname",
|
||||
typoid: 25,
|
||||
typoid: TEXT_OID,
|
||||
typlen: -1,
|
||||
..Default::default()
|
||||
},
|
||||
@@ -107,4 +116,21 @@ impl SendWalHandler {
|
||||
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send current state of safekeeper
|
||||
fn handle_state(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
|
||||
let my_info = self.timeline.get().get_info();
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
|
||||
name: b"state",
|
||||
typoid: JSON_OID,
|
||||
typlen: -1,
|
||||
..Default::default()
|
||||
}]))?
|
||||
.write_message_noflush(&BeMessage::DataRow(&[Some(
|
||||
serde_json::to_string(&my_info).unwrap().as_bytes(),
|
||||
)]))?
|
||||
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,10 +55,12 @@ impl SharedState {
|
||||
}
|
||||
|
||||
/// Load and lock control file (prevent running more than one instance of safekeeper)
|
||||
/// If create=false and file doesn't exist, bails out.
|
||||
pub fn load_control_file(
|
||||
&mut self,
|
||||
conf: &WalAcceptorConf,
|
||||
timelineid: ZTimelineId,
|
||||
create: bool,
|
||||
) -> Result<()> {
|
||||
if self.control_file.is_some() {
|
||||
info!("control file for timeline {} is already open", timelineid);
|
||||
@@ -69,13 +71,17 @@ impl SharedState {
|
||||
.data_dir
|
||||
.join(timelineid.to_string())
|
||||
.join(CONTROL_FILE_NAME);
|
||||
info!("loading control file {}", control_file_path.display());
|
||||
match OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(&control_file_path)
|
||||
{
|
||||
info!(
|
||||
"loading control file {}, create={}",
|
||||
control_file_path.display(),
|
||||
create
|
||||
);
|
||||
let mut opts = OpenOptions::new();
|
||||
opts.read(true).write(true);
|
||||
if create {
|
||||
opts.create(true);
|
||||
}
|
||||
match opts.open(&control_file_path) {
|
||||
Ok(file) => {
|
||||
// Lock file to prevent two or more active wal_acceptors
|
||||
match file.try_lock_exclusive() {
|
||||
@@ -91,29 +97,36 @@ impl SharedState {
|
||||
self.control_file = Some(file);
|
||||
|
||||
let cfile_ref = self.control_file.as_mut().unwrap();
|
||||
match SafeKeeperInfo::des_from(cfile_ref) {
|
||||
Err(e) => {
|
||||
warn!("read from {:?} failed: {}", control_file_path, e);
|
||||
if cfile_ref.metadata().unwrap().len() == 0 {
|
||||
if !create {
|
||||
bail!("control file is empty");
|
||||
}
|
||||
Ok(info) => {
|
||||
if info.magic != SK_MAGIC {
|
||||
bail!("Invalid control file magic: {}", info.magic);
|
||||
} else {
|
||||
match SafeKeeperInfo::des_from(cfile_ref) {
|
||||
Err(e) => {
|
||||
bail!("failed to read control file {:?}: {}", control_file_path, e);
|
||||
}
|
||||
if info.format_version != SK_FORMAT_VERSION {
|
||||
bail!(
|
||||
"Incompatible format version: {} vs. {}",
|
||||
info.format_version,
|
||||
SK_FORMAT_VERSION
|
||||
);
|
||||
Ok(info) => {
|
||||
if info.magic != SK_MAGIC {
|
||||
bail!("Invalid control file magic: {}", info.magic);
|
||||
}
|
||||
if info.format_version != SK_FORMAT_VERSION {
|
||||
bail!(
|
||||
"Incompatible format version: {} vs. {}",
|
||||
info.format_version,
|
||||
SK_FORMAT_VERSION
|
||||
);
|
||||
}
|
||||
self.info = info;
|
||||
}
|
||||
self.info = info;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
panic!(
|
||||
bail!(
|
||||
"Failed to open control file {:?}: {}",
|
||||
&control_file_path, e
|
||||
&control_file_path,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -198,9 +211,9 @@ impl Timeline {
|
||||
shared_state.hs_feedback.clone()
|
||||
}
|
||||
|
||||
pub fn load_control_file(&self, conf: &WalAcceptorConf) -> Result<()> {
|
||||
pub fn load_control_file(&self, conf: &WalAcceptorConf, create: bool) -> Result<()> {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.load_control_file(conf, self.timelineid)
|
||||
shared_state.load_control_file(conf, self.timelineid, create)
|
||||
}
|
||||
|
||||
pub fn save_control_file(&self, sync: bool) -> Result<()> {
|
||||
@@ -211,17 +224,23 @@ impl Timeline {
|
||||
|
||||
// Utilities needed by various Connection-like objects
|
||||
pub trait TimelineTools {
|
||||
fn set(&mut self, timeline_id: ZTimelineId) -> Result<()>;
|
||||
fn set(&mut self, conf: &WalAcceptorConf, timeline_id: ZTimelineId, create: bool)
|
||||
-> Result<()>;
|
||||
fn get(&self) -> &Arc<Timeline>;
|
||||
fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID);
|
||||
}
|
||||
|
||||
impl TimelineTools for Option<Arc<Timeline>> {
|
||||
fn set(&mut self, timeline_id: ZTimelineId) -> Result<()> {
|
||||
fn set(
|
||||
&mut self,
|
||||
conf: &WalAcceptorConf,
|
||||
timeline_id: ZTimelineId,
|
||||
create: bool,
|
||||
) -> Result<()> {
|
||||
// We will only set the timeline once. If it were to ever change,
|
||||
// anyone who cloned the Arc would be out of date.
|
||||
assert!(self.is_none());
|
||||
*self = Some(GlobalTimelines::store(timeline_id)?);
|
||||
*self = Some(GlobalTimelines::get(conf, timeline_id, create)?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -243,11 +262,16 @@ lazy_static! {
|
||||
}
|
||||
|
||||
/// A zero-sized struct used to manage access to the global timelines map.
|
||||
struct GlobalTimelines;
|
||||
pub struct GlobalTimelines;
|
||||
|
||||
impl GlobalTimelines {
|
||||
/// Store a new timeline into the global TIMELINES map.
|
||||
fn store(timeline_id: ZTimelineId) -> Result<Arc<Timeline>> {
|
||||
/// Get a timeline with control file loaded from the global TIMELINES map.
|
||||
/// If control file doesn't exist and create=false, bails out.
|
||||
pub fn get(
|
||||
conf: &WalAcceptorConf,
|
||||
timeline_id: ZTimelineId,
|
||||
create: bool,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let mut timelines = TIMELINES.lock().unwrap();
|
||||
|
||||
match timelines.get(&timeline_id) {
|
||||
@@ -258,9 +282,10 @@ impl GlobalTimelines {
|
||||
|
||||
let shared_state = SharedState::new();
|
||||
|
||||
let new_tid = Arc::new(Timeline::new(timeline_id, shared_state));
|
||||
timelines.insert(timeline_id, Arc::clone(&new_tid));
|
||||
Ok(new_tid)
|
||||
let new_tli = Arc::new(Timeline::new(timeline_id, shared_state));
|
||||
new_tli.load_control_file(conf, create)?;
|
||||
timelines.insert(timeline_id, Arc::clone(&new_tli));
|
||||
Ok(new_tli)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,7 +116,12 @@ impl PostgresBackend {
|
||||
Some(FeMessage::StartupMessage(m)) => {
|
||||
trace!("got startup message {:?}", m);
|
||||
|
||||
handler.startup(self, &m)?;
|
||||
if let Err(e) = handler.startup(self, &m) {
|
||||
// try to send error to the client
|
||||
let errmsg = format!("{}", e);
|
||||
self.write_message(&BeMessage::ErrorResponse(errmsg))?;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
match m.kind {
|
||||
StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => {
|
||||
|
||||
@@ -366,6 +366,7 @@ pub struct XLogDataBody<'a> {
|
||||
|
||||
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
|
||||
pub const TEXT_OID: Oid = 25;
|
||||
pub const JSON_OID: Oid = 114;
|
||||
// single text column
|
||||
pub static SINGLE_COL_ROWDESC: BeMessage = BeMessage::RowDescription(&[RowDescriptor {
|
||||
name: b"data",
|
||||
|
||||
Reference in New Issue
Block a user