rollback safety

This commit is contained in:
Conrad Ludgate
2025-07-22 15:58:47 +01:00
parent 11294ca322
commit 539652fa4e
4 changed files with 25 additions and 40 deletions

View File

@@ -306,7 +306,8 @@ impl Client {
// "DISCARD SEQUENCES;": deallocates all cached sequence state
let _responses = self.inner_mut().send_simple_query(
"CLOSE ALL;
"ROLLBACK;
CLOSE ALL;
SET SESSION AUTHORIZATION DEFAULT;
RESET ALL;
DEALLOCATE ALL;

View File

@@ -7,7 +7,6 @@ use std::time::Duration;
use clashmap::ClashMap;
use parking_lot::RwLock;
use postgres_client::ReadyForQueryStatus;
use rand::Rng;
use smol_str::ToSmolStr;
use tracing::{Span, debug, info, warn};
@@ -714,12 +713,6 @@ impl ClientInnerExt for postgres_client::Client {
}
impl<C: ClientInnerExt> Discard<'_, C> {
pub(crate) fn check_idle(&mut self, status: ReadyForQueryStatus) {
let conn_info = &self.conn_info;
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
info!("pool: throwing away connection '{conn_info}' because connection is not idle");
}
}
pub(crate) fn discard(&mut self) {
let conn_info = &self.conn_info;
if std::mem::take(self.pool).strong_count() > 0 {

View File

@@ -735,9 +735,7 @@ impl QueryData {
match batch_result {
// The query successfully completed.
Ok(status) => {
discard.check_idle(status);
Ok(_) => {
let json_output = String::from_utf8(json_buf).expect("json should be valid utf8");
Ok(json_output)
}
@@ -793,7 +791,7 @@ impl BatchQueryData {
{
Ok(json_output) => {
info!("commit");
let status = transaction
transaction
.commit()
.await
.inspect_err(|_| {
@@ -802,7 +800,6 @@ impl BatchQueryData {
discard.discard();
})
.map_err(SqlOverHttpError::Postgres)?;
discard.check_idle(status);
json_output
}
Err(SqlOverHttpError::Cancelled(_)) => {
@@ -815,17 +812,6 @@ impl BatchQueryData {
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
}
Err(err) => {
info!("rollback");
let status = transaction
.rollback()
.await
.inspect_err(|_| {
// if we cannot rollback - for now don't return connection to pool
// TODO: get a query status from the error
discard.discard();
})
.map_err(SqlOverHttpError::Postgres)?;
discard.check_idle(status);
return Err(err);
}
};
@@ -1012,12 +998,6 @@ impl Client {
}
impl Discard<'_> {
fn check_idle(&mut self, status: ReadyForQueryStatus) {
match self {
Discard::Remote(discard) => discard.check_idle(status),
Discard::Local(discard) => discard.check_idle(status),
}
}
fn discard(&mut self) {
match self {
Discard::Remote(discard) => discard.discard(),

View File

@@ -17,9 +17,6 @@ if TYPE_CHECKING:
from typing import Any
GET_CONNECTION_PID_QUERY = "SELECT pid FROM pg_stat_activity WHERE state = 'active'"
@pytest.mark.asyncio
async def test_http_pool_begin_1(static_proxy: NeonProxy):
static_proxy.safe_psql("create user http_auth with password 'http' superuser")
@@ -479,7 +476,7 @@ def test_sql_over_http_pool(static_proxy: NeonProxy):
def get_pid(status: int, pw: str, user="http_auth") -> Any:
return static_proxy.http_query(
GET_CONNECTION_PID_QUERY,
"SELECT pg_backend_pid() as pid",
[],
user=user,
password=pw,
@@ -573,23 +570,37 @@ def test_http_pool_begin(static_proxy: NeonProxy):
query(200, "SELECT 1;") # Query that should succeed regardless of the transaction
def test_sql_over_http_pool_idle(static_proxy: NeonProxy):
def test_sql_over_http_pool_tx_reuse(static_proxy: NeonProxy):
static_proxy.safe_psql("create user http_auth2 with password 'http' superuser")
def query(status: int, query: str) -> Any:
def query(status: int, query: str, *args) -> Any:
return static_proxy.http_query(
query,
[],
args,
user="http_auth2",
password="http",
expected_code=status,
)
pid1 = query(200, GET_CONNECTION_PID_QUERY)["rows"][0]["pid"]
def query_pid_txid() -> Any:
result = query(
200,
"SELECT pg_backend_pid() as pid, pg_current_xact_id() as txid",
)
return result["rows"][0]
res0 = query_pid_txid()
time.sleep(0.02)
query(200, "BEGIN")
pid2 = query(200, GET_CONNECTION_PID_QUERY)["rows"][0]["pid"]
assert pid1 != pid2
res1 = query_pid_txid()
res2 = query_pid_txid()
assert res0["pid"] == res1["pid"], "connection should be reused"
assert res0["pid"] == res2["pid"], "connection should be reused"
assert res1["txid"] != res2["txid"], "txid should be different"
@pytest.mark.timeout(60)