Temporary workaround for timeout retry errors

This commit is contained in:
Erik Grinaker
2025-07-09 09:49:15 +02:00
parent d63f1d259a
commit 08399672be

View File

@@ -580,6 +580,14 @@ impl StreamPool {
// Track caller response channels by request ID. If the task returns early, these response
// channels will be dropped and the waiting callers will receive an error.
//
// TODO: on timeouts, retries will send additional requests for the same request ID, which
// get piled up behind the already sent request. We should, at the very least, add deadlines
// for the request such that it's cancelled on the server side. This also means that only
// the last caller gets the response, and it may get a response for an earlier attempt. This
// needs rethinking.
//
// TODO: consider allocating separate request IDs for each retry.
let mut callers = HashMap::new();
// Process requests and responses.
@@ -593,13 +601,6 @@ impl StreamPool {
};
// Store the response channel by request ID.
if callers.contains_key(&req.request_id) {
// Error on request ID duplicates. Ignore callers that went away.
_ = resp_tx.send(Err(tonic::Status::invalid_argument(
format!("duplicate request ID: {}", req.request_id),
)));
continue;
}
callers.insert(req.request_id, resp_tx);
// Send the request on the stream. Bail out if the send fails.
@@ -615,9 +616,10 @@ impl StreamPool {
return Ok(())
};
// Send the response to the caller. Ignore errors if the caller went away.
// Send the response to the caller. Ignore errors if the caller went away. This
// may have happened with e.g. a timeout retry, where multiple requests may have
// been sent for the same ID.
let Some(resp_tx) = callers.remove(&resp.request_id) else {
warn!("received response for unknown request ID: {}", resp.request_id);
continue;
};
_ = resp_tx.send(Ok(resp));