From 290884ea3b87fa7657d69465634e82897ee2b5d3 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 10 Mar 2023 10:13:31 +0400 Subject: [PATCH] Fix too many arguments in read_network clippy complain. --- safekeeper/src/receive_wal.rs | 108 +++++++++++++++++++--------------- 1 file changed, 59 insertions(+), 49 deletions(-) diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index b7cf5a7310..0652ad0676 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -68,10 +68,17 @@ impl SafekeeperPostgresHandler { // sends, so this avoids deadlocks. let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?; let peer_addr = *pgb.get_peer_addr(); + let network_reader = NetworkReader { + ttid: self.ttid, + conn_id: self.conn_id, + pgb_reader: &mut pgb_reader, + peer_addr, + acceptor_handle: &mut acceptor_handle, + }; let res = tokio::select! { // todo: add read|write .context to these errors - r = read_network(self.ttid, self.conn_id, &mut pgb_reader, peer_addr, msg_tx, &mut acceptor_handle, msg_rx, reply_tx) => r, - r = write_network(pgb, reply_rx) => r, + r = network_reader.run(msg_tx, msg_rx, reply_tx) => r, + r = network_write(pgb, reply_rx) => r, }; // Join pg backend back. @@ -104,6 +111,55 @@ impl SafekeeperPostgresHandler { } } +struct NetworkReader<'a> { + ttid: TenantTimelineId, + conn_id: ConnectionId, + pgb_reader: &'a mut PostgresBackendReader, + peer_addr: SocketAddr, + // WalAcceptor is spawned when we learn server info from walproposer and + // create timeline; handle is put here. + acceptor_handle: &'a mut Option>>, +} + +impl<'a> NetworkReader<'a> { + async fn run( + self, + msg_tx: Sender, + msg_rx: Receiver, + reply_tx: Sender, + ) -> Result<(), CopyStreamHandlerEnd> { + // Receive information about server to create timeline, if not yet. + let next_msg = read_message(self.pgb_reader).await?; + let tli = match next_msg { + ProposerAcceptorMessage::Greeting(ref greeting) => { + info!( + "start handshake with walproposer {} sysid {} timeline {}", + self.peer_addr, greeting.system_id, greeting.tli, + ); + let server_info = ServerInfo { + pg_version: greeting.pg_version, + system_id: greeting.system_id, + wal_seg_size: greeting.wal_seg_size, + }; + GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID).await? + } + _ => { + return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!( + "unexpected message {next_msg:?} instead of greeting" + ))) + } + }; + + *self.acceptor_handle = Some( + WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, self.conn_id) + .context("spawn WalAcceptor thread")?, + ); + + // Forward all messages to WalAcceptor + read_network_loop(self.pgb_reader, msg_tx, next_msg).await + } +} + /// Read next message from walproposer. /// TODO: Return Ok(None) on graceful termination. async fn read_message( @@ -114,52 +170,6 @@ async fn read_message( Ok(msg) } -/// Read messages from socket and pass it to WalAcceptor thread. Returns Ok(()) -/// if msg_tx closed; it must mean WalAcceptor terminated, joining it should -/// tell the error. -async fn read_network( - ttid: TenantTimelineId, - conn_id: ConnectionId, - pgb_reader: &mut PostgresBackendReader, - peer_addr: SocketAddr, - msg_tx: Sender, - // WalAcceptor is spawned when we learn server info from walproposer and - // create timeline; handle is put here. - acceptor_handle: &mut Option>>, - msg_rx: Receiver, - reply_tx: Sender, -) -> Result<(), CopyStreamHandlerEnd> { - // Receive information about server to create timeline, if not yet. - let next_msg = read_message(pgb_reader).await?; - let tli = match next_msg { - ProposerAcceptorMessage::Greeting(ref greeting) => { - info!( - "start handshake with walproposer {} sysid {} timeline {}", - peer_addr, greeting.system_id, greeting.tli, - ); - let server_info = ServerInfo { - pg_version: greeting.pg_version, - system_id: greeting.system_id, - wal_seg_size: greeting.wal_seg_size, - }; - GlobalTimelines::create(ttid, server_info, Lsn::INVALID, Lsn::INVALID).await? - } - _ => { - return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!( - "unexpected message {next_msg:?} instead of greeting" - ))) - } - }; - - *acceptor_handle = Some( - WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, conn_id) - .context("spawn WalAcceptor thread")?, - ); - - // Forward all messages to WalAcceptor - read_network_loop(pgb_reader, msg_tx, next_msg).await -} - async fn read_network_loop( pgb_reader: &mut PostgresBackendReader, msg_tx: Sender, @@ -176,7 +186,7 @@ async fn read_network_loop( /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(()) /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should /// tell the error. -async fn write_network( +async fn network_write( pgb_writer: &mut PostgresBackend, mut reply_rx: Receiver, ) -> Result<(), CopyStreamHandlerEnd> {