mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
callmemaybe. review code cleanup
This commit is contained in:
@@ -184,7 +184,7 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
);
|
||||
}
|
||||
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
let conf_cloned = conf.clone();
|
||||
let wal_acceptor_thread = thread::Builder::new()
|
||||
.name("WAL acceptor thread".into())
|
||||
|
||||
@@ -1,18 +1,22 @@
|
||||
//!
|
||||
//! Callmemaybe thread is responsible for periodically requesting
|
||||
//! Callmemaybe module is responsible for periodically requesting
|
||||
//! pageserver to initiate wal streaming.
|
||||
//!
|
||||
//! Other threads can use CallmeEvent messages to subscribe or unsubscribe
|
||||
//! from the call list.
|
||||
//!
|
||||
use crate::SafeKeeperConf;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::Result;
|
||||
use log::*;
|
||||
use std::collections::HashMap;
|
||||
use std::net::ToSocketAddrs;
|
||||
use std::sync::Mutex;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::runtime;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::mpsc::UnboundedReceiver;
|
||||
use tokio::task;
|
||||
use tokio_postgres::NoTls;
|
||||
use tracing::*;
|
||||
use zenith_utils::connstring::connection_host_port;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
async fn request_callback(
|
||||
@@ -33,21 +37,14 @@ async fn request_callback(
|
||||
}
|
||||
});
|
||||
|
||||
// Send callmemaybe request
|
||||
let listen_pg_addr = listen_pg_addr_str
|
||||
.to_socket_addrs()
|
||||
.unwrap()
|
||||
.next()
|
||||
.unwrap();
|
||||
// use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses
|
||||
let me_connstr = format!("postgresql://no_user@{}/no_db", listen_pg_addr_str);
|
||||
let me_conf: postgres::config::Config = me_connstr.parse().unwrap();
|
||||
let (host, port) = connection_host_port(&me_conf);
|
||||
|
||||
let callme = format!(
|
||||
"callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'",
|
||||
tenantid,
|
||||
timelineid,
|
||||
&listen_pg_addr.ip().to_string(),
|
||||
listen_pg_addr.port(),
|
||||
timelineid,
|
||||
tenantid
|
||||
tenantid, timelineid, host, port, timelineid, tenantid
|
||||
);
|
||||
|
||||
let _ = client.simple_query(&callme).await?;
|
||||
@@ -55,7 +52,7 @@ async fn request_callback(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn thread_main(conf: SafeKeeperConf, rx: Receiver<CallmeEvent>) -> Result<()> {
|
||||
pub fn thread_main(conf: SafeKeeperConf, rx: UnboundedReceiver<CallmeEvent>) -> Result<()> {
|
||||
let runtime = runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
@@ -64,6 +61,7 @@ pub fn thread_main(conf: SafeKeeperConf, rx: Receiver<CallmeEvent>) -> Result<()
|
||||
runtime.block_on(main_loop(conf, rx))
|
||||
}
|
||||
|
||||
/// Messages to the callmemaybe thread
|
||||
#[derive(Debug)]
|
||||
pub enum CallmeEvent {
|
||||
// add new subscription to the list
|
||||
@@ -77,17 +75,13 @@ pub enum CallmeEvent {
|
||||
Resume(ZTenantId, ZTimelineId),
|
||||
}
|
||||
|
||||
struct SubscriptionStateInner {
|
||||
handle: Option<task::JoinHandle<()>>,
|
||||
last_call_time: Instant,
|
||||
paused: bool,
|
||||
}
|
||||
|
||||
struct SubscriptionState {
|
||||
tenantid: ZTenantId,
|
||||
timelineid: ZTimelineId,
|
||||
pageserver_connstr: String,
|
||||
inner: Mutex<SubscriptionStateInner>,
|
||||
handle: Option<task::JoinHandle<()>>,
|
||||
last_call_time: Instant,
|
||||
paused: bool,
|
||||
}
|
||||
|
||||
impl SubscriptionState {
|
||||
@@ -96,63 +90,29 @@ impl SubscriptionState {
|
||||
timelineid: ZTimelineId,
|
||||
pageserver_connstr: String,
|
||||
) -> SubscriptionState {
|
||||
let state_inner = SubscriptionStateInner {
|
||||
handle: None,
|
||||
last_call_time: Instant::now(),
|
||||
paused: false,
|
||||
};
|
||||
|
||||
SubscriptionState {
|
||||
tenantid,
|
||||
timelineid,
|
||||
pageserver_connstr,
|
||||
inner: Mutex::new(state_inner),
|
||||
handle: None,
|
||||
last_call_time: Instant::now(),
|
||||
paused: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn pause(&self) {
|
||||
let mut state_inner = self.inner.lock().unwrap();
|
||||
state_inner.paused = true;
|
||||
|
||||
if let Some(handle) = state_inner.handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
fn pause(&mut self) {
|
||||
self.paused = true;
|
||||
self.abort_handle();
|
||||
}
|
||||
|
||||
fn resume(&self) {
|
||||
let mut state_inner = self.inner.lock().unwrap();
|
||||
state_inner.paused = false;
|
||||
fn resume(&mut self) {
|
||||
self.paused = false;
|
||||
}
|
||||
|
||||
fn call(&self, recall_period: Duration, listen_pg_addr: String) {
|
||||
let mut state_inner = self.inner.lock().unwrap();
|
||||
|
||||
// Ignore call request if this subscription is paused
|
||||
if state_inner.paused {
|
||||
debug!(
|
||||
"ignore call request for paused subscription
|
||||
tenantid: {}, timelineid: {}",
|
||||
self.tenantid, self.timelineid
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if it too early to recall
|
||||
if state_inner.handle.is_some() && state_inner.last_call_time.elapsed() < recall_period {
|
||||
debug!(
|
||||
"too early to recall. state_inner.last_call_time.elapsed: {:?}, recall_period: {:?}
|
||||
tenantid: {}, timelineid: {}",
|
||||
state_inner.last_call_time, recall_period, self.tenantid, self.timelineid
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// If previous task didn't complete in recall_period, it must be hanging,
|
||||
// so don't wait for it forever, just abort it and try again.
|
||||
//
|
||||
// Most likely, the task have already successfully completed
|
||||
// and abort() won't have any effect.
|
||||
if let Some(handle) = state_inner.handle.take() {
|
||||
// Most likely, the task have already successfully completed
|
||||
// and abort() won't have any effect.
|
||||
fn abort_handle(&mut self) {
|
||||
if let Some(handle) = self.handle.take() {
|
||||
handle.abort();
|
||||
|
||||
let timelineid = self.timelineid;
|
||||
@@ -162,111 +122,128 @@ impl SubscriptionState {
|
||||
if err.is_cancelled() {
|
||||
warn!("callback task for timelineid={} tenantid={} was cancelled before spawning a new one",
|
||||
timelineid, tenantid);
|
||||
} else {
|
||||
error!(
|
||||
"callback task for timelineid={} tenantid={} failed: {}",
|
||||
timelineid, tenantid, err
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, recall_period: Duration, listen_pg_addr: String) {
|
||||
// Ignore call request if this subscription is paused
|
||||
if self.paused {
|
||||
debug!(
|
||||
"ignore call request for paused subscription
|
||||
tenantid: {}, timelineid: {}",
|
||||
self.tenantid, self.timelineid
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if it too early to recall
|
||||
if self.handle.is_some() && self.last_call_time.elapsed() < recall_period {
|
||||
debug!(
|
||||
"too early to recall. self.last_call_time.elapsed: {:?}, recall_period: {:?}
|
||||
tenantid: {}, timelineid: {}",
|
||||
self.last_call_time, recall_period, self.tenantid, self.timelineid
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// If previous task didn't complete in recall_period, it must be hanging,
|
||||
// so don't wait for it forever, just abort it and try again.
|
||||
self.abort_handle();
|
||||
|
||||
let timelineid = self.timelineid;
|
||||
let tenantid = self.tenantid;
|
||||
let pageserver_connstr = self.pageserver_connstr.clone();
|
||||
state_inner.handle = Some(tokio::spawn(async move {
|
||||
self.handle = Some(tokio::spawn(async move {
|
||||
request_callback(pageserver_connstr, listen_pg_addr, timelineid, tenantid)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
error!(
|
||||
"callmemaybe. request_callback for timelineid={} tenantid={} failed: {}",
|
||||
"callback task for timelineid={} tenantid={} failed: {}",
|
||||
timelineid, tenantid, e
|
||||
);
|
||||
})
|
||||
)
|
||||
});
|
||||
}));
|
||||
|
||||
// Update last_call_time
|
||||
state_inner.last_call_time = Instant::now();
|
||||
self.last_call_time = Instant::now();
|
||||
debug!(
|
||||
"new call spawned. time {:?}
|
||||
tenantid: {}, timelineid: {}",
|
||||
state_inner.last_call_time, self.tenantid, self.timelineid
|
||||
self.last_call_time, self.tenantid, self.timelineid
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SubscriptionStateInner {
|
||||
impl Drop for SubscriptionState {
|
||||
fn drop(&mut self) {
|
||||
if let Some(handle) = self.handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
self.abort_handle();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn main_loop(conf: SafeKeeperConf, mut rx: Receiver<CallmeEvent>) -> Result<()> {
|
||||
pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEvent>) -> Result<()> {
|
||||
let subscriptions: Mutex<HashMap<(ZTenantId, ZTimelineId), SubscriptionState>> =
|
||||
Mutex::new(HashMap::new());
|
||||
|
||||
loop {
|
||||
let call_iteration = tokio::select! {
|
||||
request = rx.recv() => {
|
||||
match request {
|
||||
Some(request) =>
|
||||
tokio::select! {
|
||||
request = rx.recv() =>
|
||||
{
|
||||
match request.ok_or_else(|| anyhow!("done"))?
|
||||
{
|
||||
CallmeEvent::Subscribe(tenantid, timelineid, pageserver_connstr) =>
|
||||
{
|
||||
match request
|
||||
let mut subscriptions = subscriptions.lock().unwrap();
|
||||
if let Some(mut sub) = subscriptions.insert((tenantid, timelineid),
|
||||
SubscriptionState::new(tenantid, timelineid, pageserver_connstr))
|
||||
{
|
||||
CallmeEvent::Subscribe(tenantid, timelineid, pageserver_connstr) =>
|
||||
{
|
||||
let mut subscriptions = subscriptions.lock().unwrap();
|
||||
subscriptions.insert((tenantid, timelineid), SubscriptionState::new(tenantid, timelineid, pageserver_connstr)) ;
|
||||
debug!("callmemaybe. thread_main. subscribe callback request for timelineid={} tenantid={}",
|
||||
timelineid, tenantid);
|
||||
true
|
||||
},
|
||||
CallmeEvent::Unsubscribe(tenantid, timelineid) => {
|
||||
let mut subscriptions = subscriptions.lock().unwrap();
|
||||
subscriptions.remove(&(tenantid, timelineid));
|
||||
debug!("callmemaybe. thread_main. unsubscribe callback request for timelineid={} tenantid={}",
|
||||
timelineid, tenantid);
|
||||
false
|
||||
},
|
||||
CallmeEvent::Pause(tenantid, timelineid) => {
|
||||
let subscriptions = subscriptions.lock().unwrap();
|
||||
if let Some(sub) = subscriptions.get(&(tenantid, timelineid))
|
||||
{
|
||||
sub.pause();
|
||||
}
|
||||
debug!("callmemaybe. thread_main. pause callback request for timelineid={} tenantid={}",
|
||||
timelineid, tenantid);
|
||||
false
|
||||
},
|
||||
CallmeEvent::Resume(tenantid, timelineid) => {
|
||||
let mut call_iteration = false;
|
||||
let subscriptions = subscriptions.lock().unwrap();
|
||||
if let Some(sub) = subscriptions.get(&(tenantid, timelineid))
|
||||
{
|
||||
sub.resume();
|
||||
debug!("callmemaybe. thread_main. resume callback request for timelineid={} tenantid={}",
|
||||
timelineid, tenantid);
|
||||
call_iteration = true;
|
||||
}
|
||||
call_iteration
|
||||
},
|
||||
sub.call(conf.recall_period, conf.listen_pg_addr.clone());
|
||||
}
|
||||
debug!("callmemaybe. thread_main. subscribe callback request for timelineid={} tenantid={}",
|
||||
timelineid, tenantid);
|
||||
},
|
||||
// all senders disconnected
|
||||
None => {
|
||||
return Ok(());
|
||||
CallmeEvent::Unsubscribe(tenantid, timelineid) => {
|
||||
let mut subscriptions = subscriptions.lock().unwrap();
|
||||
subscriptions.remove(&(tenantid, timelineid));
|
||||
debug!("callmemaybe. thread_main. unsubscribe callback request for timelineid={} tenantid={}",
|
||||
timelineid, tenantid);
|
||||
},
|
||||
CallmeEvent::Pause(tenantid, timelineid) => {
|
||||
let mut subscriptions = subscriptions.lock().unwrap();
|
||||
if let Some(sub) = subscriptions.get_mut(&(tenantid, timelineid))
|
||||
{
|
||||
sub.pause();
|
||||
};
|
||||
debug!("callmemaybe. thread_main. pause callback request for timelineid={} tenantid={}",
|
||||
timelineid, tenantid);
|
||||
},
|
||||
CallmeEvent::Resume(tenantid, timelineid) => {
|
||||
let mut subscriptions = subscriptions.lock().unwrap();
|
||||
if let Some(sub) = subscriptions.get_mut(&(tenantid, timelineid))
|
||||
{
|
||||
sub.resume();
|
||||
sub.call(conf.recall_period, conf.listen_pg_addr.clone());
|
||||
};
|
||||
|
||||
debug!("callmemaybe. thread_main. resume callback request for timelineid={} tenantid={}",
|
||||
timelineid, tenantid);
|
||||
},
|
||||
}
|
||||
},
|
||||
_ = tokio::time::sleep(conf.recall_period) => { true },
|
||||
_ = tokio::time::sleep(conf.recall_period) => {
|
||||
let mut subscriptions = subscriptions.lock().unwrap();
|
||||
|
||||
for (&(_tenantid, _timelineid), state) in subscriptions.iter_mut() {
|
||||
state.call(conf.recall_period, conf.listen_pg_addr.clone());
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
if call_iteration {
|
||||
let subscriptions = subscriptions.lock().unwrap();
|
||||
|
||||
for (&(_tenantid, _timelineid), state) in subscriptions.iter() {
|
||||
let listen_pg_addr = conf.listen_pg_addr.clone();
|
||||
|
||||
state.call(conf.recall_period, listen_pg_addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,12 +5,7 @@
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use bytes::BytesMut;
|
||||
<<<<<<< HEAD
|
||||
use postgres::{Client, Config, NoTls};
|
||||
use tracing::*;
|
||||
=======
|
||||
use log::*;
|
||||
>>>>>>> callmemaybe refactoring
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
@@ -24,7 +19,7 @@ use zenith_utils::pq_proto::{BeMessage, FeMessage};
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
use crate::callmemaybe::CallmeEvent;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
pub struct ReceiveWalConn<'pg> {
|
||||
/// Postgres connection
|
||||
@@ -103,17 +98,23 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
let _guard = match self.pageserver_connstr {
|
||||
Some(ref pageserver_connstr) => {
|
||||
// Need to establish replication channel with page server.
|
||||
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
|
||||
// Add far as replication in postgres is initiated by receiver
|
||||
// we should use callmemaybe mechanism.
|
||||
let timelineid = swh.timeline.get().timelineid;
|
||||
let tx_clone = swh.tx.clone();
|
||||
let pageserver_connstr = pageserver_connstr.to_owned();
|
||||
swh.tx
|
||||
.blocking_send(CallmeEvent::Subscribe(
|
||||
.send(CallmeEvent::Subscribe(
|
||||
tenant_id,
|
||||
timelineid,
|
||||
pageserver_connstr,
|
||||
))
|
||||
.unwrap();
|
||||
.unwrap_or_else(|e| {
|
||||
error!(
|
||||
"failed to send Subscribe request to callmemaybe thread {}",
|
||||
e
|
||||
);
|
||||
});
|
||||
|
||||
// create a guard to unsubscribe callback, when this wal_stream will exit
|
||||
Some(SendWalHandlerGuard {
|
||||
@@ -140,7 +141,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
}
|
||||
|
||||
struct SendWalHandlerGuard {
|
||||
tx: Sender<CallmeEvent>,
|
||||
tx: UnboundedSender<CallmeEvent>,
|
||||
tenant_id: ZTenantId,
|
||||
timelineid: ZTimelineId,
|
||||
}
|
||||
@@ -148,7 +149,12 @@ struct SendWalHandlerGuard {
|
||||
impl Drop for SendWalHandlerGuard {
|
||||
fn drop(&mut self) {
|
||||
self.tx
|
||||
.blocking_send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid))
|
||||
.unwrap();
|
||||
.send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid))
|
||||
.unwrap_or_else(|e| {
|
||||
error!(
|
||||
"failed to send Unsubscribe request to callmemaybe thread {}",
|
||||
e
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody
|
||||
use zenith_utils::sock_split::ReadStream;
|
||||
|
||||
use crate::callmemaybe::CallmeEvent;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX;
|
||||
@@ -90,7 +90,7 @@ impl Drop for ReplicationConnGuard {
|
||||
// and current ReplicationConnGuard is tied to the background thread
|
||||
// that receives feedback.
|
||||
struct ReplicationStreamGuard {
|
||||
tx: Sender<CallmeEvent>,
|
||||
tx: UnboundedSender<CallmeEvent>,
|
||||
tenant_id: ZTenantId,
|
||||
timelineid: ZTimelineId,
|
||||
}
|
||||
@@ -103,8 +103,10 @@ impl Drop for ReplicationStreamGuard {
|
||||
self.tenant_id, self.timelineid);
|
||||
|
||||
self.tx
|
||||
.blocking_send(CallmeEvent::Resume(self.tenant_id, self.timelineid))
|
||||
.unwrap();
|
||||
.send(CallmeEvent::Resume(self.tenant_id, self.timelineid))
|
||||
.unwrap_or_else(|e| {
|
||||
error!("failed to send Resume request to callmemaybe thread {}", e);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -251,21 +253,27 @@ impl ReplicationConn {
|
||||
info!("Start replication from {:?} till {:?}", start_pos, stop_pos);
|
||||
|
||||
// Don't spam pageserver with callmemaybe queries
|
||||
// when connection is already established.
|
||||
// when replication connection with pageserver is already established.
|
||||
let _guard = {
|
||||
let timelineid = swh.timeline.get().timelineid;
|
||||
let tenant_id = swh.tenantid.unwrap();
|
||||
let tx_clone = swh.tx.clone();
|
||||
swh.tx
|
||||
.blocking_send(CallmeEvent::Pause(tenant_id, timelineid))
|
||||
.unwrap();
|
||||
if swh.appname == Some("wal_proposer_recovery".to_string()) {
|
||||
None
|
||||
} else {
|
||||
let timelineid = swh.timeline.get().timelineid;
|
||||
let tenant_id = swh.tenantid.unwrap();
|
||||
let tx_clone = swh.tx.clone();
|
||||
swh.tx
|
||||
.send(CallmeEvent::Pause(tenant_id, timelineid))
|
||||
.unwrap_or_else(|e| {
|
||||
error!("failed to send Pause request to callmemaybe thread {}", e);
|
||||
});
|
||||
|
||||
// create a guard to subscribe callback again, when this connection will exit
|
||||
Some(ReplicationStreamGuard {
|
||||
tx: tx_clone,
|
||||
tenant_id,
|
||||
timelineid,
|
||||
})
|
||||
// create a guard to subscribe callback again, when this connection will exit
|
||||
Some(ReplicationStreamGuard {
|
||||
tx: tx_clone,
|
||||
tenant_id,
|
||||
timelineid,
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
// switch to copy
|
||||
|
||||
@@ -19,7 +19,7 @@ use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
use crate::callmemaybe::CallmeEvent;
|
||||
use crate::timeline::CreateControlFile;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
/// Handler for streaming WAL from acceptor
|
||||
pub struct SendWalHandler {
|
||||
@@ -30,7 +30,7 @@ pub struct SendWalHandler {
|
||||
pub timelineid: Option<ZTimelineId>,
|
||||
pub timeline: Option<Arc<Timeline>>,
|
||||
//sender to communicate with callmemaybe thread
|
||||
pub tx: Sender<CallmeEvent>,
|
||||
pub tx: UnboundedSender<CallmeEvent>,
|
||||
}
|
||||
|
||||
impl postgres_backend::Handler for SendWalHandler {
|
||||
@@ -103,7 +103,7 @@ impl postgres_backend::Handler for SendWalHandler {
|
||||
}
|
||||
|
||||
impl SendWalHandler {
|
||||
pub fn new(conf: SafeKeeperConf, tx: Sender<CallmeEvent>) -> Self {
|
||||
pub fn new(conf: SafeKeeperConf, tx: UnboundedSender<CallmeEvent>) -> Self {
|
||||
SendWalHandler {
|
||||
conf,
|
||||
appname: None,
|
||||
|
||||
@@ -11,14 +11,14 @@ use tracing::*;
|
||||
use crate::callmemaybe::CallmeEvent;
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::SafeKeeperConf;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use zenith_utils::postgres_backend::{AuthType, PostgresBackend};
|
||||
|
||||
/// Accept incoming TCP connections and spawn them into a background thread.
|
||||
pub fn thread_main(
|
||||
conf: SafeKeeperConf,
|
||||
listener: TcpListener,
|
||||
tx: Sender<CallmeEvent>,
|
||||
tx: UnboundedSender<CallmeEvent>,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
match listener.accept() {
|
||||
@@ -51,7 +51,11 @@ fn get_tid() -> u64 {
|
||||
|
||||
/// This is run by `thread_main` above, inside a background thread.
|
||||
///
|
||||
fn handle_socket(socket: TcpStream, conf: SafeKeeperConf, tx: Sender<CallmeEvent>) -> Result<()> {
|
||||
fn handle_socket(
|
||||
socket: TcpStream,
|
||||
conf: SafeKeeperConf,
|
||||
tx: UnboundedSender<CallmeEvent>,
|
||||
) -> Result<()> {
|
||||
let _enter = info_span!("", tid = ?get_tid()).entered();
|
||||
|
||||
socket.set_nodelay(true)?;
|
||||
|
||||
Reference in New Issue
Block a user