mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
remove replies for store/remove ops
This commit is contained in:
@@ -34,7 +34,6 @@ pub enum CancelKeyOp {
|
||||
StoreCancelKey {
|
||||
key: String,
|
||||
value: String,
|
||||
resp_tx: Option<oneshot::Sender<anyhow::Result<()>>>,
|
||||
_guard: CancelChannelSizeGuard<'static>,
|
||||
expire: i64, // TTL for key
|
||||
},
|
||||
@@ -46,7 +45,6 @@ pub enum CancelKeyOp {
|
||||
RemoveCancelKey {
|
||||
key: String,
|
||||
field: String,
|
||||
resp_tx: Option<oneshot::Sender<anyhow::Result<()>>>,
|
||||
_guard: CancelChannelSizeGuard<'static>,
|
||||
},
|
||||
}
|
||||
@@ -104,13 +102,6 @@ impl Pipeline {
|
||||
fn add_command_no_reply(&mut self, cmd: Cmd) {
|
||||
self.inner.add_command(cmd).ignore();
|
||||
}
|
||||
|
||||
fn add_command(&mut self, cmd: Cmd, reply: Option<CancelReplyOp>) {
|
||||
match reply {
|
||||
Some(reply) => self.add_command_with_reply(cmd, reply),
|
||||
None => self.add_command_no_reply(cmd),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CancelKeyOp {
|
||||
@@ -120,13 +111,10 @@ impl CancelKeyOp {
|
||||
CancelKeyOp::StoreCancelKey {
|
||||
key,
|
||||
value,
|
||||
resp_tx,
|
||||
_guard,
|
||||
expire,
|
||||
} => {
|
||||
let reply =
|
||||
resp_tx.map(|resp_tx| CancelReplyOp::StoreCancelKey { resp_tx, _guard });
|
||||
pipe.add_command(Cmd::hset(&key, "data", value), reply);
|
||||
pipe.add_command_no_reply(Cmd::hset(&key, "data", value));
|
||||
pipe.add_command_no_reply(Cmd::expire(key, expire));
|
||||
}
|
||||
CancelKeyOp::GetCancelData {
|
||||
@@ -137,15 +125,8 @@ impl CancelKeyOp {
|
||||
let reply = CancelReplyOp::GetCancelData { resp_tx, _guard };
|
||||
pipe.add_command_with_reply(Cmd::hget(key, "data"), reply);
|
||||
}
|
||||
CancelKeyOp::RemoveCancelKey {
|
||||
key,
|
||||
field,
|
||||
resp_tx,
|
||||
_guard,
|
||||
} => {
|
||||
let reply =
|
||||
resp_tx.map(|resp_tx| CancelReplyOp::RemoveCancelKey { resp_tx, _guard });
|
||||
pipe.add_command(Cmd::hdel(key, field), reply);
|
||||
CancelKeyOp::RemoveCancelKey { key, field, _guard } => {
|
||||
pipe.add_command_no_reply(Cmd::hdel(key, field));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -153,54 +134,26 @@ impl CancelKeyOp {
|
||||
|
||||
// Message types for sending through mpsc channel
|
||||
pub enum CancelReplyOp {
|
||||
StoreCancelKey {
|
||||
resp_tx: oneshot::Sender<anyhow::Result<()>>,
|
||||
_guard: CancelChannelSizeGuard<'static>,
|
||||
},
|
||||
GetCancelData {
|
||||
resp_tx: oneshot::Sender<anyhow::Result<String>>,
|
||||
_guard: CancelChannelSizeGuard<'static>,
|
||||
},
|
||||
RemoveCancelKey {
|
||||
resp_tx: oneshot::Sender<anyhow::Result<()>>,
|
||||
_guard: CancelChannelSizeGuard<'static>,
|
||||
},
|
||||
}
|
||||
|
||||
impl CancelReplyOp {
|
||||
fn send_err(self, e: anyhow::Error) {
|
||||
match self {
|
||||
CancelReplyOp::StoreCancelKey { resp_tx, _guard } => {
|
||||
resp_tx
|
||||
.send(Err(e))
|
||||
.inspect_err(|_| tracing::debug!("could not send reply"))
|
||||
.ok();
|
||||
}
|
||||
CancelReplyOp::GetCancelData { resp_tx, _guard } => {
|
||||
resp_tx
|
||||
.send(Err(e))
|
||||
.inspect_err(|_| tracing::debug!("could not send reply"))
|
||||
.ok();
|
||||
}
|
||||
CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => {
|
||||
resp_tx
|
||||
.send(Err(e))
|
||||
.inspect_err(|_| tracing::debug!("could not send reply"))
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn send_value(self, v: redis::Value) {
|
||||
match self {
|
||||
CancelReplyOp::StoreCancelKey { resp_tx, _guard } => {
|
||||
let send =
|
||||
FromRedisValue::from_owned_redis_value(v).context("could not parse value");
|
||||
resp_tx
|
||||
.send(send)
|
||||
.inspect_err(|_| tracing::debug!("could not send reply"))
|
||||
.ok();
|
||||
}
|
||||
CancelReplyOp::GetCancelData { resp_tx, _guard } => {
|
||||
let send =
|
||||
FromRedisValue::from_owned_redis_value(v).context("could not parse value");
|
||||
@@ -209,14 +162,6 @@ impl CancelReplyOp {
|
||||
.inspect_err(|_| tracing::debug!("could not send reply"))
|
||||
.ok();
|
||||
}
|
||||
CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => {
|
||||
let send =
|
||||
FromRedisValue::from_owned_redis_value(v).context("could not parse value");
|
||||
resp_tx
|
||||
.send(send)
|
||||
.inspect_err(|_| tracing::debug!("could not send reply"))
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -380,6 +325,7 @@ impl CancellationHandler {
|
||||
|
||||
Ok(Some(cancel_closure))
|
||||
}
|
||||
|
||||
/// Try to cancel a running query for the corresponding connection.
|
||||
/// If the cancellation key is not found, it will be published to Redis.
|
||||
/// check_allowed - if true, check if the IP is allowed to cancel the query.
|
||||
@@ -526,7 +472,6 @@ impl Session {
|
||||
let op = CancelKeyOp::StoreCancelKey {
|
||||
key: self.redis_key.clone(),
|
||||
value: closure_json,
|
||||
resp_tx: None,
|
||||
_guard: Metrics::get()
|
||||
.proxy
|
||||
.cancel_channel_size
|
||||
@@ -550,7 +495,6 @@ impl Session {
|
||||
let op = CancelKeyOp::RemoveCancelKey {
|
||||
key: self.redis_key.clone(),
|
||||
field: "data".to_string(),
|
||||
resp_tx: None,
|
||||
_guard: Metrics::get()
|
||||
.proxy
|
||||
.cancel_channel_size
|
||||
|
||||
Reference in New Issue
Block a user