mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 21:10:38 +00:00
Compare commits
7 Commits
wal_back_p
...
netstat-lo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4187be11c7 | ||
|
|
13f4e173c9 | ||
|
|
85116a8375 | ||
|
|
b05757d3a2 | ||
|
|
b4bf0a01bb | ||
|
|
0907fe638d | ||
|
|
6ed0c23731 |
@@ -256,6 +256,8 @@ jobs:
|
||||
if << parameters.run_in_parallel >>; then
|
||||
EXTRA_PARAMS="-n4 $EXTRA_PARAMS"
|
||||
fi;
|
||||
./netstat-script.sh &
|
||||
NS_PID=$!
|
||||
# Run the tests.
|
||||
#
|
||||
# The junit.xml file allows CircleCI to display more fine-grained test information
|
||||
@@ -267,6 +269,8 @@ jobs:
|
||||
# -s is not used to prevent pytest from capturing output, because tests are running
|
||||
# in parallel and logs are mixed between different tests
|
||||
pipenv run pytest --junitxml=$TEST_OUTPUT/junit.xml --tb=short --verbose -rA $TEST_SELECTION $EXTRA_PARAMS
|
||||
kill $NS_PID
|
||||
awk '/===/ {if (count) print count; print; count=0; next} {count++} END {print count}' $TEST_OUTPUT/netstat.stdout > $TEST_OUTPUT/netstat_stats.stdout
|
||||
- run:
|
||||
# CircleCI artifacts are preserved one file at a time, so skipping
|
||||
# this step isn't a good idea. If you want to extract the
|
||||
|
||||
@@ -300,7 +300,7 @@ impl PostgresNode {
|
||||
conf.append("shared_buffers", "1MB");
|
||||
conf.append("fsync", "off");
|
||||
conf.append("max_connections", "100");
|
||||
conf.append("wal_sender_timeout", "10s");
|
||||
conf.append("wal_sender_timeout", "0");
|
||||
conf.append("wal_level", "replica");
|
||||
conf.append("listen_addresses", &self.address.ip().to_string());
|
||||
conf.append("port", &self.address.port().to_string());
|
||||
|
||||
@@ -695,8 +695,8 @@ impl Timeline for LayeredTimeline {
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive, disk consistent LSN={}",
|
||||
lsn, self.get_disk_consistent_lsn()
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive",
|
||||
lsn
|
||||
)
|
||||
})?;
|
||||
|
||||
@@ -910,10 +910,6 @@ impl Timeline for LayeredTimeline {
|
||||
Ok(total_blocks * BLCKSZ as usize)
|
||||
}
|
||||
|
||||
fn get_disk_consistent_lsn(&self) -> Lsn {
|
||||
self.disk_consistent_lsn.load()
|
||||
}
|
||||
|
||||
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a> {
|
||||
Box::new(LayeredTimelineWriter {
|
||||
tl: self,
|
||||
|
||||
@@ -134,7 +134,6 @@ pub trait Timeline: Send + Sync {
|
||||
fn get_last_record_lsn(&self) -> Lsn;
|
||||
fn get_prev_record_lsn(&self) -> Lsn;
|
||||
fn get_start_lsn(&self) -> Lsn;
|
||||
fn get_disk_consistent_lsn(&self) -> Lsn;
|
||||
|
||||
/// Mutate the timeline with a [`TimelineWriter`].
|
||||
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a>;
|
||||
|
||||
@@ -284,14 +284,12 @@ fn walreceiver_main(
|
||||
if let Some(last_lsn) = status_update {
|
||||
// TODO: More thought should go into what values are sent here.
|
||||
let last_lsn = PgLsn::from(u64::from(last_lsn));
|
||||
// We are using disk consistent LSN as `write_lsn`, i.e. LSN at which page server
|
||||
// may guarantee persistence of all received data. Safekeeper is not free to remove
|
||||
// WAL preceding `write_lsn`: it should not be requested by this page server.
|
||||
let write_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn()));
|
||||
let write_lsn = last_lsn;
|
||||
let flush_lsn = last_lsn;
|
||||
let apply_lsn = PgLsn::from(0);
|
||||
let ts = SystemTime::now();
|
||||
const NO_REPLY: u8 = 0;
|
||||
|
||||
physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?;
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ pub struct DatabaseInfo {
|
||||
pub port: u16,
|
||||
pub dbname: String,
|
||||
pub user: String,
|
||||
pub password: String,
|
||||
pub password: Option<String>,
|
||||
}
|
||||
|
||||
impl DatabaseInfo {
|
||||
@@ -24,12 +24,23 @@ impl DatabaseInfo {
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::Error::msg("cannot resolve at least one SocketAddr"))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn conn_string(&self) -> String {
|
||||
format!(
|
||||
"dbname={} user={} password={}",
|
||||
self.dbname, self.user, self.password
|
||||
)
|
||||
impl From<DatabaseInfo> for tokio_postgres::Config {
|
||||
fn from(db_info: DatabaseInfo) -> Self {
|
||||
let mut config = tokio_postgres::Config::new();
|
||||
|
||||
config
|
||||
.host(&db_info.host)
|
||||
.port(db_info.port)
|
||||
.dbname(&db_info.dbname)
|
||||
.user(&db_info.user);
|
||||
|
||||
if let Some(password) = db_info.password {
|
||||
config.password(password);
|
||||
}
|
||||
|
||||
config
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -145,18 +145,18 @@ fn main() -> anyhow::Result<()> {
|
||||
println!("Starting mgmt on {}", state.conf.mgmt_address);
|
||||
let mgmt_listener = TcpListener::bind(state.conf.mgmt_address)?;
|
||||
|
||||
let threads = vec![
|
||||
let threads = [
|
||||
// Spawn a thread to listen for connections. It will spawn further threads
|
||||
// for each connection.
|
||||
thread::Builder::new()
|
||||
.name("Proxy thread".into())
|
||||
.name("Listener thread".into())
|
||||
.spawn(move || proxy::thread_main(state, pageserver_listener))?,
|
||||
thread::Builder::new()
|
||||
.name("Mgmt thread".into())
|
||||
.spawn(move || mgmt::thread_main(state, mgmt_listener))?,
|
||||
];
|
||||
|
||||
for t in threads.into_iter() {
|
||||
for t in threads {
|
||||
t.join().unwrap()?;
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ use anyhow::bail;
|
||||
use tokio_postgres::NoTls;
|
||||
|
||||
use rand::Rng;
|
||||
use std::io::Write;
|
||||
use std::{io, sync::mpsc::channel, thread};
|
||||
use zenith_utils::postgres_backend::Stream;
|
||||
use zenith_utils::postgres_backend::{PostgresBackend, ProtoState};
|
||||
@@ -28,11 +27,13 @@ pub fn thread_main(
|
||||
println!("accepted connection from {}", peer_addr);
|
||||
socket.set_nodelay(true).unwrap();
|
||||
|
||||
thread::spawn(move || {
|
||||
if let Err(err) = proxy_conn_main(state, socket) {
|
||||
println!("error: {}", err);
|
||||
}
|
||||
});
|
||||
thread::Builder::new()
|
||||
.name("Proxy thread".into())
|
||||
.spawn(move || {
|
||||
if let Err(err) = proxy_conn_main(state, socket) {
|
||||
println!("error: {}", err);
|
||||
}
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,6 +159,7 @@ impl ProxyConnection {
|
||||
fn handle_existing_user(&mut self) -> anyhow::Result<DatabaseInfo> {
|
||||
// ask password
|
||||
rand::thread_rng().fill(&mut self.md5_salt);
|
||||
|
||||
self.pgb
|
||||
.write_message(&BeMessage::AuthenticationMD5Password(&self.md5_salt))?;
|
||||
self.pgb.state = ProtoState::Authentication; // XXX
|
||||
@@ -250,51 +252,68 @@ databases without opening the browser.
|
||||
/// Create a TCP connection to a postgres database, authenticate with it, and receive the ReadyForQuery message
|
||||
async fn connect_to_db(db_info: DatabaseInfo) -> anyhow::Result<tokio::net::TcpStream> {
|
||||
let mut socket = tokio::net::TcpStream::connect(db_info.socket_addr()?).await?;
|
||||
let config = db_info.conn_string().parse::<tokio_postgres::Config>()?;
|
||||
let config = tokio_postgres::Config::from(db_info);
|
||||
let _ = config.connect_raw(&mut socket, NoTls).await?;
|
||||
Ok(socket)
|
||||
}
|
||||
|
||||
/// Concurrently proxy both directions of the client and server connections
|
||||
fn proxy(
|
||||
client_read: ReadStream,
|
||||
client_write: WriteStream,
|
||||
server_read: ReadStream,
|
||||
server_write: WriteStream,
|
||||
(client_read, client_write): (ReadStream, WriteStream),
|
||||
(server_read, server_write): (ReadStream, WriteStream),
|
||||
) -> anyhow::Result<()> {
|
||||
fn do_proxy(mut reader: ReadStream, mut writer: WriteStream) -> io::Result<()> {
|
||||
std::io::copy(&mut reader, &mut writer)?;
|
||||
writer.flush()?;
|
||||
writer.shutdown(std::net::Shutdown::Both)
|
||||
fn do_proxy(mut reader: impl io::Read, mut writer: WriteStream) -> io::Result<u64> {
|
||||
/// FlushWriter will make sure that every message is sent as soon as possible
|
||||
struct FlushWriter<W>(W);
|
||||
|
||||
impl<W: io::Write> io::Write for FlushWriter<W> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
// `std::io::copy` is guaranteed to exit if we return an error,
|
||||
// so we can afford to lose `res` in case `flush` fails
|
||||
let res = self.0.write(buf);
|
||||
if res.is_ok() {
|
||||
self.0.flush()?;
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.0.flush()
|
||||
}
|
||||
}
|
||||
|
||||
let res = std::io::copy(&mut reader, &mut FlushWriter(&mut writer));
|
||||
writer.shutdown(std::net::Shutdown::Both)?;
|
||||
res
|
||||
}
|
||||
|
||||
let client_to_server_jh = thread::spawn(move || do_proxy(client_read, server_write));
|
||||
|
||||
let res1 = do_proxy(server_read, client_write);
|
||||
let res2 = client_to_server_jh.join().unwrap();
|
||||
res1?;
|
||||
res2?;
|
||||
do_proxy(server_read, client_write)?;
|
||||
client_to_server_jh.join().unwrap()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Proxy a client connection to a postgres database
|
||||
fn proxy_pass(pgb: PostgresBackend, db_info: DatabaseInfo) -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
let db_stream = runtime.block_on(connect_to_db(db_info))?;
|
||||
let db_stream = db_stream.into_std()?;
|
||||
db_stream.set_nonblocking(false)?;
|
||||
let db_stream = {
|
||||
// We'll get rid of this once migration to async is complete
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
let db_stream = zenith_utils::sock_split::BidiStream::from_tcp(db_stream);
|
||||
let (db_read, db_write) = db_stream.split();
|
||||
let stream = runtime.block_on(connect_to_db(db_info))?.into_std()?;
|
||||
stream.set_nonblocking(false)?;
|
||||
stream
|
||||
};
|
||||
|
||||
let stream = match pgb.into_stream() {
|
||||
let db = zenith_utils::sock_split::BidiStream::from_tcp(db_stream);
|
||||
|
||||
let client = match pgb.into_stream() {
|
||||
Stream::Bidirectional(bidi_stream) => bidi_stream,
|
||||
_ => bail!("invalid stream"),
|
||||
};
|
||||
|
||||
let (client_read, client_write) = stream.split();
|
||||
proxy(client_read, client_write, db_read, db_write)
|
||||
proxy(client.split(), db.split())
|
||||
}
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import asyncio
|
||||
import asyncpg
|
||||
import random
|
||||
import time
|
||||
|
||||
from fixtures.zenith_fixtures import WalAcceptor, WalAcceptorFactory, ZenithPageserver, PostgresFactory, Postgres
|
||||
from fixtures.log_helper import getLogger
|
||||
from fixtures.utils import lsn_from_hex, lsn_to_hex
|
||||
from typing import List
|
||||
|
||||
log = getLogger('root.wal_acceptor_async')
|
||||
@@ -102,6 +104,38 @@ async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accou
|
||||
await pg_conn.close()
|
||||
|
||||
|
||||
async def wait_for_lsn(safekeeper: WalAcceptor,
|
||||
tenant_id: str,
|
||||
timeline_id: str,
|
||||
wait_lsn: str,
|
||||
polling_interval=1,
|
||||
timeout=600):
|
||||
"""
|
||||
Poll flush_lsn from safekeeper until it's greater or equal than
|
||||
provided wait_lsn. To do that, timeline_status is fetched from
|
||||
safekeeper every polling_interval seconds.
|
||||
"""
|
||||
|
||||
started_at = time.time()
|
||||
client = safekeeper.http_client()
|
||||
|
||||
flush_lsn = client.timeline_status(tenant_id, timeline_id).flush_lsn
|
||||
log.info(
|
||||
f'Safekeeper at port {safekeeper.port.pg} has flush_lsn {flush_lsn}, waiting for lsn {wait_lsn}'
|
||||
)
|
||||
|
||||
while lsn_from_hex(wait_lsn) > lsn_from_hex(flush_lsn):
|
||||
elapsed = time.time() - started_at
|
||||
if elapsed > timeout:
|
||||
raise RuntimeError(
|
||||
f"timed out waiting for safekeeper at port {safekeeper.port.pg} to reach {wait_lsn}, current lsn is {flush_lsn}"
|
||||
)
|
||||
|
||||
await asyncio.sleep(polling_interval)
|
||||
flush_lsn = client.timeline_status(tenant_id, timeline_id).flush_lsn
|
||||
log.debug(f'safekeeper port={safekeeper.port.pg} flush_lsn={flush_lsn} wait_lsn={wait_lsn}')
|
||||
|
||||
|
||||
# This test will run several iterations and check progress in each of them.
|
||||
# On each iteration 1 acceptor is stopped, and 2 others should allow
|
||||
# background workers execute transactions. In the end, state should remain
|
||||
@@ -114,6 +148,9 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_
|
||||
iterations = 6
|
||||
|
||||
pg_conn = await pg.connect_async()
|
||||
tenant_id = await pg_conn.fetchval("show zenith.zenith_tenant")
|
||||
timeline_id = await pg_conn.fetchval("show zenith.zenith_timeline")
|
||||
|
||||
bank = BankClient(pg_conn, n_accounts=n_accounts, init_amount=init_amount)
|
||||
# create tables and initial balances
|
||||
await bank.initdb()
|
||||
@@ -125,14 +162,18 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_
|
||||
workers.append(asyncio.create_task(worker))
|
||||
|
||||
for it in range(iterations):
|
||||
victim = acceptors[it % len(acceptors)]
|
||||
victim_idx = it % len(acceptors)
|
||||
victim = acceptors[victim_idx]
|
||||
victim.stop()
|
||||
|
||||
# Wait till previous victim recovers so it is ready for the next
|
||||
# iteration by making any writing xact.
|
||||
conn = await pg.connect_async()
|
||||
await conn.execute('UPDATE bank_accs SET amount = amount WHERE uid = 1', timeout=120)
|
||||
await conn.close()
|
||||
flush_lsn = await pg_conn.fetchval('SELECT pg_current_wal_flush_lsn()')
|
||||
flush_lsn = lsn_to_hex(flush_lsn)
|
||||
log.info(f'Postgres flush_lsn {flush_lsn}')
|
||||
|
||||
# Wait until alive safekeepers catch up with postgres
|
||||
for idx, safekeeper in enumerate(acceptors):
|
||||
if idx != victim_idx:
|
||||
await wait_for_lsn(safekeeper, tenant_id, timeline_id, flush_lsn)
|
||||
|
||||
stats.reset()
|
||||
await asyncio.sleep(period_time)
|
||||
|
||||
@@ -63,3 +63,9 @@ def global_counter() -> int:
|
||||
def lsn_to_hex(num: int) -> str:
|
||||
""" Convert lsn from int to standard hex notation. """
|
||||
return "{:X}/{:X}".format(num >> 32, num & 0xffffffff)
|
||||
|
||||
|
||||
def lsn_from_hex(lsn_hex: str) -> int:
|
||||
""" Convert lsn from hex notation to int. """
|
||||
l, r = lsn_hex.split('/')
|
||||
return (int(l, 16) << 32) + int(r, 16)
|
||||
|
||||
@@ -1031,8 +1031,9 @@ def wa_factory(zenith_binpath: str,
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageserverTimelineStatus:
|
||||
class SafekeeperTimelineStatus:
|
||||
acceptor_epoch: int
|
||||
flush_lsn: str
|
||||
|
||||
|
||||
class WalAcceptorHttpClient(requests.Session):
|
||||
@@ -1043,11 +1044,12 @@ class WalAcceptorHttpClient(requests.Session):
|
||||
def check_status(self):
|
||||
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
|
||||
|
||||
def timeline_status(self, tenant_id: str, timeline_id: str) -> PageserverTimelineStatus:
|
||||
def timeline_status(self, tenant_id: str, timeline_id: str) -> SafekeeperTimelineStatus:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id}/{timeline_id}")
|
||||
res.raise_for_status()
|
||||
resj = res.json()
|
||||
return PageserverTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'])
|
||||
return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'],
|
||||
flush_lsn=resj['flush_lsn'])
|
||||
|
||||
|
||||
@zenfixture
|
||||
|
||||
8
test_runner/netstat-script.sh
Executable file
8
test_runner/netstat-script.sh
Executable file
@@ -0,0 +1,8 @@
|
||||
#!/bin/bash
|
||||
|
||||
while true; do
|
||||
echo -n "==== CURRENT TIME:" >> /tmp/test_output/netstat.stdout
|
||||
date +"%T.%N" >> /tmp/test_output/netstat.stdout
|
||||
sudo netstat -vpnoa | grep tcp | sort >> /tmp/test_output/netstat.stdout
|
||||
sleep 0.5
|
||||
done
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 462264946d...9160deb05a
@@ -49,6 +49,8 @@ struct TimelineStatus {
|
||||
commit_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
truncate_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
flush_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Report info about timeline.
|
||||
@@ -64,6 +66,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
)
|
||||
.map_err(ApiError::from_err)?;
|
||||
let sk_state = tli.get_info();
|
||||
let (flush_lsn, _) = tli.get_end_of_wal();
|
||||
|
||||
let status = TimelineStatus {
|
||||
tenant_id,
|
||||
@@ -71,6 +74,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
acceptor_state: sk_state.acceptor_state,
|
||||
commit_lsn: sk_state.commit_lsn,
|
||||
truncate_lsn: sk_state.truncate_lsn,
|
||||
flush_lsn,
|
||||
};
|
||||
Ok(json_response(StatusCode::OK, status)?)
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
//! with the "START_REPLICATION" message.
|
||||
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::timeline::{ReplicaState, Timeline, TimelineTools};
|
||||
use crate::timeline::{Timeline, TimelineTools};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
@@ -20,7 +20,7 @@ use std::{str, thread};
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::postgres_backend::PostgresBackend;
|
||||
use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody};
|
||||
use zenith_utils::pq_proto::{BeMessage, FeMessage, XLogDataBody};
|
||||
use zenith_utils::sock_split::ReadStream;
|
||||
|
||||
pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX;
|
||||
@@ -32,7 +32,7 @@ const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
|
||||
type FullTransactionId = u64;
|
||||
|
||||
/// Hot standby feedback received from replica
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct HotStandbyFeedback {
|
||||
pub ts: TimestampTz,
|
||||
pub xmin: FullTransactionId,
|
||||
@@ -49,16 +49,6 @@ impl HotStandbyFeedback {
|
||||
}
|
||||
}
|
||||
|
||||
/// Standby status update
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct StandbyReply {
|
||||
pub write_lsn: Lsn, // disk consistent lSN
|
||||
pub flush_lsn: Lsn, // LSN committedby quorum
|
||||
pub apply_lsn: Lsn, // not used
|
||||
pub reply_ts: TimestampTz,
|
||||
pub reply_requested: bool,
|
||||
}
|
||||
|
||||
/// A network connection that's speaking the replication protocol.
|
||||
pub struct ReplicationConn {
|
||||
/// This is an `Option` because we will spawn a background thread that will
|
||||
@@ -66,15 +56,16 @@ pub struct ReplicationConn {
|
||||
stream_in: Option<ReadStream>,
|
||||
}
|
||||
|
||||
/// Scope guard to unregister replication connection from timeline
|
||||
struct ReplicationConnGuard {
|
||||
replica: usize, // replica internal ID assigned by timeline
|
||||
timeline: Arc<Timeline>,
|
||||
// TODO: move this to crate::timeline when there's more users
|
||||
// TODO: design a proper Timeline mock api
|
||||
trait HsFeedbackSubscriber {
|
||||
fn add_hs_feedback(&self, _feedback: HotStandbyFeedback) {}
|
||||
}
|
||||
|
||||
impl Drop for ReplicationConnGuard {
|
||||
fn drop(&mut self) {
|
||||
self.timeline.update_replica_state(self.replica, None);
|
||||
impl HsFeedbackSubscriber for Arc<Timeline> {
|
||||
#[inline(always)]
|
||||
fn add_hs_feedback(&self, feedback: HotStandbyFeedback) {
|
||||
Timeline::add_hs_feedback(self, feedback);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,33 +79,26 @@ impl ReplicationConn {
|
||||
|
||||
/// Handle incoming messages from the network.
|
||||
/// This is spawned into the background by `handle_start_replication`.
|
||||
fn background_thread(mut stream_in: impl Read, timeline: Arc<Timeline>) -> Result<()> {
|
||||
let mut state = ReplicaState::new();
|
||||
let replica = timeline.add_replica(state);
|
||||
let _guard = ReplicationConnGuard {
|
||||
replica,
|
||||
timeline: timeline.clone(),
|
||||
};
|
||||
fn background_thread(
|
||||
mut stream_in: impl Read,
|
||||
subscriber: impl HsFeedbackSubscriber,
|
||||
) -> Result<()> {
|
||||
// Wait for replica's feedback.
|
||||
while let Some(msg) = FeMessage::read(&mut stream_in)? {
|
||||
match &msg {
|
||||
FeMessage::CopyData(m) => {
|
||||
// There's two possible data messages that the client is supposed to send here:
|
||||
// `HotStandbyFeedback` and `StandbyStatusUpdate`.
|
||||
// `HotStandbyFeedback` and `StandbyStatusUpdate`. We only handle hot standby
|
||||
// feedback.
|
||||
|
||||
match m.first().cloned() {
|
||||
Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
|
||||
// Note: deserializing is on m[1..] because we skip the tag byte.
|
||||
state.hs_feedback = HotStandbyFeedback::des(&m[1..])
|
||||
let feedback = HotStandbyFeedback::des(&m[1..])
|
||||
.context("failed to deserialize HotStandbyFeedback")?;
|
||||
timeline.update_replica_state(replica, Some(state));
|
||||
}
|
||||
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
|
||||
let reply = StandbyReply::des(&m[1..])
|
||||
.context("failed to deserialize StandbyReply")?;
|
||||
state.disk_consistent_lsn = reply.write_lsn;
|
||||
timeline.update_replica_state(replica, Some(state));
|
||||
subscriber.add_hs_feedback(feedback);
|
||||
}
|
||||
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => (),
|
||||
_ => warn!("unexpected message {:?}", msg),
|
||||
}
|
||||
}
|
||||
@@ -203,7 +187,7 @@ impl ReplicationConn {
|
||||
// switch to copy
|
||||
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
||||
|
||||
let mut end_pos = Lsn(0);
|
||||
let mut end_pos: Lsn;
|
||||
let mut wal_file: Option<File> = None;
|
||||
|
||||
loop {
|
||||
@@ -218,18 +202,7 @@ impl ReplicationConn {
|
||||
} else {
|
||||
/* normal mode */
|
||||
let timeline = swh.timeline.get();
|
||||
if let Some(lsn) = timeline.wait_for_lsn(start_pos) {
|
||||
end_pos = lsn
|
||||
} else {
|
||||
// timeout expired: request pageserver status
|
||||
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
sent_ptr: end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
request_reply: true,
|
||||
}))
|
||||
.context("Failed to send KeepAlive message")?;
|
||||
continue;
|
||||
}
|
||||
end_pos = timeline.wait_for_lsn(start_pos);
|
||||
}
|
||||
if end_pos == END_REPLICATION_MARKER {
|
||||
break;
|
||||
@@ -284,3 +257,18 @@ impl ReplicationConn {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
// A no-op impl for tests
|
||||
impl HsFeedbackSubscriber for () {}
|
||||
|
||||
#[test]
|
||||
fn test_replication_conn_background_thread_eof() {
|
||||
// Test that background_thread recognizes EOF
|
||||
let stream: &[u8] = &[];
|
||||
ReplicationConn::background_thread(stream, ()).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -191,8 +191,6 @@ pub struct AppendResponse {
|
||||
// We report back our awareness about which WAL is committed, as this is
|
||||
// a criterion for walproposer --sync mode exit
|
||||
pub commit_lsn: Lsn,
|
||||
// Min disk consistent lsn of pageservers (portion of WAL applied and written to the disk by pageservers)
|
||||
pub disk_consistent_lsn: Lsn,
|
||||
pub hs_feedback: HotStandbyFeedback,
|
||||
}
|
||||
|
||||
@@ -460,7 +458,6 @@ where
|
||||
epoch: self.s.acceptor_state.epoch,
|
||||
commit_lsn: Lsn(0),
|
||||
flush_lsn: Lsn(0),
|
||||
disk_consistent_lsn: Lsn(0),
|
||||
hs_feedback: HotStandbyFeedback::empty(),
|
||||
};
|
||||
return Ok(AcceptorProposerMessage::AppendResponse(resp));
|
||||
@@ -570,7 +567,6 @@ where
|
||||
epoch: self.s.acceptor_state.epoch,
|
||||
flush_lsn: self.flush_lsn,
|
||||
commit_lsn: self.s.commit_lsn,
|
||||
disk_consistent_lsn: Lsn(0),
|
||||
// will be filled by caller code to avoid bothering safekeeper
|
||||
hs_feedback: HotStandbyFeedback::empty(),
|
||||
};
|
||||
|
||||
@@ -11,9 +11,9 @@ use std::collections::HashMap;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Seek, SeekFrom, Write};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::time::Duration;
|
||||
use zenith_utils::bin_ser::LeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER};
|
||||
@@ -25,35 +25,6 @@ use crate::WalAcceptorConf;
|
||||
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
|
||||
|
||||
const CONTROL_FILE_NAME: &str = "safekeeper.control";
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Replica status: host standby feedback + disk consistent lsn
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct ReplicaState {
|
||||
/// combined disk_consistent_lsn of pageservers
|
||||
pub disk_consistent_lsn: Lsn,
|
||||
/// combined hot standby feedback from all replicas
|
||||
pub hs_feedback: HotStandbyFeedback,
|
||||
}
|
||||
|
||||
impl Default for ReplicaState {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl ReplicaState {
|
||||
pub fn new() -> ReplicaState {
|
||||
ReplicaState {
|
||||
disk_consistent_lsn: Lsn(u64::MAX),
|
||||
hs_feedback: HotStandbyFeedback {
|
||||
ts: 0,
|
||||
xmin: u64::MAX,
|
||||
catalog_xmin: u64::MAX,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared state associated with database instance (tenant)
|
||||
struct SharedState {
|
||||
@@ -62,8 +33,8 @@ struct SharedState {
|
||||
/// For receiving-sending wal cooperation
|
||||
/// quorum commit LSN we've notified walsenders about
|
||||
notified_commit_lsn: Lsn,
|
||||
/// State of replicas
|
||||
replicas: Vec<Option<ReplicaState>>,
|
||||
/// combined hot standby feedback from all replicas
|
||||
hs_feedback: HotStandbyFeedback,
|
||||
}
|
||||
|
||||
// A named boolean.
|
||||
@@ -74,31 +45,6 @@ pub enum CreateControlFile {
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
/// Get combined stateof all alive replicas
|
||||
pub fn get_replicas_state(&self) -> ReplicaState {
|
||||
let mut acc = ReplicaState::new();
|
||||
for state in self.replicas.iter().flatten() {
|
||||
acc.hs_feedback.ts = max(acc.hs_feedback.ts, state.hs_feedback.ts);
|
||||
acc.hs_feedback.xmin = min(acc.hs_feedback.xmin, state.hs_feedback.xmin);
|
||||
acc.hs_feedback.catalog_xmin =
|
||||
min(acc.hs_feedback.catalog_xmin, state.hs_feedback.catalog_xmin);
|
||||
acc.disk_consistent_lsn = Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn);
|
||||
}
|
||||
acc
|
||||
}
|
||||
|
||||
/// Assign new replica ID. We choose first empty cell in the replicas vector
|
||||
/// or extend the vector if there are not free items.
|
||||
pub fn add_replica(&mut self, state: ReplicaState) -> usize {
|
||||
if let Some(pos) = self.replicas.iter().position(|r| r.is_none()) {
|
||||
self.replicas[pos] = Some(state);
|
||||
return pos;
|
||||
}
|
||||
let pos = self.replicas.len();
|
||||
self.replicas.push(Some(state));
|
||||
pos
|
||||
}
|
||||
|
||||
/// Restore SharedState from control file. Locks the control file along the
|
||||
/// way to prevent running more than one instance of safekeeper on the same
|
||||
/// data dir.
|
||||
@@ -128,10 +74,21 @@ impl SharedState {
|
||||
Ok(Self {
|
||||
notified_commit_lsn: Lsn(0),
|
||||
sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state),
|
||||
replicas: Vec::new(),
|
||||
hs_feedback: HotStandbyFeedback {
|
||||
ts: 0,
|
||||
xmin: u64::MAX,
|
||||
catalog_xmin: u64::MAX,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/// Accumulate hot standby feedbacks from replicas
|
||||
pub fn add_hs_feedback(&mut self, feedback: HotStandbyFeedback) {
|
||||
self.hs_feedback.xmin = min(self.hs_feedback.xmin, feedback.xmin);
|
||||
self.hs_feedback.catalog_xmin = min(self.hs_feedback.catalog_xmin, feedback.catalog_xmin);
|
||||
self.hs_feedback.ts = max(self.hs_feedback.ts, feedback.ts);
|
||||
}
|
||||
|
||||
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
|
||||
/// If create=false and file doesn't exist, bails out.
|
||||
fn load_control_file(
|
||||
@@ -221,27 +178,20 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Timed wait for an LSN to be committed.
|
||||
/// Wait for an LSN to be committed.
|
||||
///
|
||||
/// Returns the last committed LSN, which will be at least
|
||||
/// as high as the LSN waited for, or None if timeout expired.
|
||||
/// as high as the LSN waited for.
|
||||
///
|
||||
pub fn wait_for_lsn(&self, lsn: Lsn) -> Option<Lsn> {
|
||||
pub fn wait_for_lsn(&self, lsn: Lsn) -> Lsn {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
loop {
|
||||
let commit_lsn = shared_state.notified_commit_lsn;
|
||||
// This must be `>`, not `>=`.
|
||||
if commit_lsn > lsn {
|
||||
return Some(commit_lsn);
|
||||
return commit_lsn;
|
||||
}
|
||||
let result = self
|
||||
.cond
|
||||
.wait_timeout(shared_state, POLL_STATE_TIMEOUT)
|
||||
.unwrap();
|
||||
if result.1.timed_out() {
|
||||
return None;
|
||||
}
|
||||
shared_state = result.0
|
||||
shared_state = self.cond.wait(shared_state).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -269,11 +219,9 @@ impl Timeline {
|
||||
// commit_lsn if we are catching up safekeeper.
|
||||
commit_lsn = shared_state.sk.commit_lsn;
|
||||
|
||||
// if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
|
||||
// if this is AppendResponse, fill in proper hot standby feedback
|
||||
if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg {
|
||||
let state = shared_state.get_replicas_state();
|
||||
resp.hs_feedback = state.hs_feedback;
|
||||
resp.disk_consistent_lsn = state.disk_consistent_lsn;
|
||||
resp.hs_feedback = shared_state.hs_feedback.clone();
|
||||
}
|
||||
}
|
||||
// Ping wal sender that new data might be available.
|
||||
@@ -285,14 +233,15 @@ impl Timeline {
|
||||
self.mutex.lock().unwrap().sk.s.clone()
|
||||
}
|
||||
|
||||
pub fn add_replica(&self, state: ReplicaState) -> usize {
|
||||
// Accumulate hot standby feedbacks from replicas
|
||||
pub fn add_hs_feedback(&self, feedback: HotStandbyFeedback) {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.add_replica(state)
|
||||
shared_state.add_hs_feedback(feedback);
|
||||
}
|
||||
|
||||
pub fn update_replica_state(&self, id: usize, state: Option<ReplicaState>) {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.replicas[id] = state;
|
||||
pub fn get_hs_feedback(&self) -> HotStandbyFeedback {
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.hs_feedback.clone()
|
||||
}
|
||||
|
||||
pub fn get_end_of_wal(&self) -> (Lsn, u32) {
|
||||
|
||||
@@ -358,7 +358,6 @@ pub enum BeMessage<'a> {
|
||||
RowDescription(&'a [RowDescriptor<'a>]),
|
||||
XLogData(XLogDataBody<'a>),
|
||||
NoticeResponse(String),
|
||||
KeepAlive(WalSndKeepAlive),
|
||||
}
|
||||
|
||||
// One row desciption in RowDescription packet.
|
||||
@@ -410,13 +409,6 @@ pub struct XLogDataBody<'a> {
|
||||
pub data: &'a [u8],
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WalSndKeepAlive {
|
||||
pub sent_ptr: u64,
|
||||
pub timestamp: i64,
|
||||
pub request_reply: bool,
|
||||
}
|
||||
|
||||
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
|
||||
|
||||
// single text column
|
||||
@@ -729,18 +721,6 @@ impl<'a> BeMessage<'a> {
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
BeMessage::KeepAlive(req) => {
|
||||
buf.put_u8(b'd');
|
||||
write_body(buf, |buf| {
|
||||
buf.put_u8(b'k');
|
||||
buf.put_u64(req.sent_ptr);
|
||||
buf.put_i64(req.timestamp);
|
||||
buf.put_u8(if req.request_reply { 1u8 } else { 0u8 });
|
||||
Ok::<_, io::Error>(())
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -107,21 +107,12 @@ impl io::Write for WriteStream {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TlsBoxed {
|
||||
stream: BufStream,
|
||||
session: rustls::ServerSession,
|
||||
}
|
||||
|
||||
impl TlsBoxed {
|
||||
fn rustls_stream(&mut self) -> rustls::Stream<rustls::ServerSession, BufStream> {
|
||||
rustls::Stream::new(&mut self.session, &mut self.stream)
|
||||
}
|
||||
}
|
||||
type TlsStream<T> = rustls::StreamOwned<rustls::ServerSession, T>;
|
||||
|
||||
pub enum BidiStream {
|
||||
Tcp(BufStream),
|
||||
/// This variant is boxed, because [`rustls::ServerSession`] is quite larger than [`BufStream`].
|
||||
Tls(Box<TlsBoxed>),
|
||||
Tls(Box<TlsStream<BufStream>>),
|
||||
}
|
||||
|
||||
impl BidiStream {
|
||||
@@ -134,11 +125,11 @@ impl BidiStream {
|
||||
Self::Tcp(stream) => stream.get_ref().shutdown(how),
|
||||
Self::Tls(tls_boxed) => {
|
||||
if how == Shutdown::Read {
|
||||
tls_boxed.stream.get_ref().shutdown(how)
|
||||
tls_boxed.sock.get_ref().shutdown(how)
|
||||
} else {
|
||||
tls_boxed.session.send_close_notify();
|
||||
let res = tls_boxed.rustls_stream().flush();
|
||||
tls_boxed.stream.get_ref().shutdown(how)?;
|
||||
tls_boxed.sess.send_close_notify();
|
||||
let res = tls_boxed.flush();
|
||||
tls_boxed.sock.get_ref().shutdown(how)?;
|
||||
res
|
||||
}
|
||||
}
|
||||
@@ -155,7 +146,7 @@ impl BidiStream {
|
||||
(ReadStream::Tcp(reader), WriteStream::Tcp(stream))
|
||||
}
|
||||
Self::Tls(tls_boxed) => {
|
||||
let reader = tls_boxed.stream.into_reader();
|
||||
let reader = tls_boxed.sock.into_reader();
|
||||
let buffer_data = reader.buffer().to_owned();
|
||||
let read_buf_cfg = rustls_split::BufCfg::with_data(buffer_data, 8192);
|
||||
let write_buf_cfg = rustls_split::BufCfg::with_capacity(8192);
|
||||
@@ -164,7 +155,7 @@ impl BidiStream {
|
||||
let socket = Arc::try_unwrap(reader.into_inner().0).unwrap();
|
||||
|
||||
let (read_half, write_half) =
|
||||
rustls_split::split(socket, tls_boxed.session, read_buf_cfg, write_buf_cfg);
|
||||
rustls_split::split(socket, tls_boxed.sess, read_buf_cfg, write_buf_cfg);
|
||||
(ReadStream::Tls(read_half), WriteStream::Tls(write_half))
|
||||
}
|
||||
}
|
||||
@@ -175,7 +166,7 @@ impl BidiStream {
|
||||
Self::Tcp(mut stream) => {
|
||||
session.complete_io(&mut stream)?;
|
||||
assert!(!session.is_handshaking());
|
||||
Ok(Self::Tls(Box::new(TlsBoxed { stream, session })))
|
||||
Ok(Self::Tls(Box::new(TlsStream::new(session, stream))))
|
||||
}
|
||||
Self::Tls { .. } => Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
@@ -189,7 +180,7 @@ impl io::Read for BidiStream {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
match self {
|
||||
Self::Tcp(stream) => stream.read(buf),
|
||||
Self::Tls(tls_boxed) => tls_boxed.rustls_stream().read(buf),
|
||||
Self::Tls(tls_boxed) => tls_boxed.read(buf),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -198,14 +189,14 @@ impl io::Write for BidiStream {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
match self {
|
||||
Self::Tcp(stream) => stream.write(buf),
|
||||
Self::Tls(tls_boxed) => tls_boxed.rustls_stream().write(buf),
|
||||
Self::Tls(tls_boxed) => tls_boxed.write(buf),
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
match self {
|
||||
Self::Tcp(stream) => stream.flush(),
|
||||
Self::Tls(tls_boxed) => tls_boxed.rustls_stream().flush(),
|
||||
Self::Tls(tls_boxed) => tls_boxed.flush(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user