diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index 3797ac39d1..72a436e25f 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -561,6 +561,20 @@ impl InterpretedWalReader { // Update internal and external state, then reset the WAL stream // if required. let senders = self.shard_senders.entry(shard_id).or_default(); + + // Clean up any shard senders that have dropped out before adding the new + // one. This avoids a build up of dead senders. + senders.retain(|sender| { + let closed = sender.tx.is_closed(); + + if closed { + let sender_id = ShardSenderId::new(shard_id, sender.sender_id); + tracing::info!("Removed shard sender {}", sender_id); + } + + !closed + }); + let new_sender_id = match senders.last() { Some(sender) => sender.sender_id.next(), None => SenderId::first()