Compare commits

...

4 Commits

Author SHA1 Message Date
Conrad Ludgate
18370b1e25 one more simple_query 2024-12-04 09:42:15 +00:00
Conrad Ludgate
209c720142 fix other uses of simple_query 2024-12-04 09:42:15 +00:00
Conrad Ludgate
c1d220ad4a fix 2024-12-04 09:42:15 +00:00
Conrad Ludgate
b23fd32f29 chore: update rust-postgres fork 2024-12-04 09:42:15 +00:00
10 changed files with 47 additions and 32 deletions

21
Cargo.lock generated
View File

@@ -4170,8 +4170,8 @@ dependencies = [
[[package]] [[package]]
name = "postgres" name = "postgres"
version = "0.19.4" version = "0.19.9"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796" source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon-rebase1#a88729d425754cb64423aa7140caac9f1cc9bb2a"
dependencies = [ dependencies = [
"bytes", "bytes",
"fallible-iterator", "fallible-iterator",
@@ -4183,10 +4183,10 @@ dependencies = [
[[package]] [[package]]
name = "postgres-protocol" name = "postgres-protocol"
version = "0.6.4" version = "0.6.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796" source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon-rebase1#a88729d425754cb64423aa7140caac9f1cc9bb2a"
dependencies = [ dependencies = [
"base64 0.20.0", "base64 0.22.1",
"byteorder", "byteorder",
"bytes", "bytes",
"fallible-iterator", "fallible-iterator",
@@ -4197,7 +4197,6 @@ dependencies = [
"rand 0.8.5", "rand 0.8.5",
"sha2", "sha2",
"stringprep", "stringprep",
"tokio",
] ]
[[package]] [[package]]
@@ -4218,8 +4217,8 @@ dependencies = [
[[package]] [[package]]
name = "postgres-types" name = "postgres-types"
version = "0.2.4" version = "0.2.8"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796" source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon-rebase1#a88729d425754cb64423aa7140caac9f1cc9bb2a"
dependencies = [ dependencies = [
"bytes", "bytes",
"fallible-iterator", "fallible-iterator",
@@ -6542,8 +6541,8 @@ dependencies = [
[[package]] [[package]]
name = "tokio-postgres" name = "tokio-postgres"
version = "0.7.7" version = "0.7.12"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796" source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon-rebase1#a88729d425754cb64423aa7140caac9f1cc9bb2a"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"byteorder", "byteorder",
@@ -6558,9 +6557,11 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
"postgres-protocol", "postgres-protocol",
"postgres-types", "postgres-types",
"rand 0.8.5",
"socket2", "socket2",
"tokio", "tokio",
"tokio-util", "tokio-util",
"whoami",
] ]
[[package]] [[package]]

View File

@@ -211,10 +211,10 @@ env_logger = "0.10"
log = "0.4" log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon-rebase1" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon-rebase1" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon-rebase1" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon-rebase1" }
## Local libraries ## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" } compute_api = { version = "0.1", path = "./libs/compute_api/" }
@@ -254,7 +254,7 @@ tonic-build = "0.12"
[patch.crates-io] [patch.crates-io]
# Needed to get `tokio-postgres-rustls` to depend on our fork. # Needed to get `tokio-postgres-rustls` to depend on our fork.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon-rebase1" }
################# Binary contents sections ################# Binary contents sections

View File

@@ -30,7 +30,8 @@ pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
match client.simple_query(query).await { match client.simple_query(query).await {
Result::Ok(result) => { Result::Ok(result) => {
if result.len() != 1 { // one extra item for row description
if result.len() != 2 {
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
"expected 1 query results, but got {}", "expected 1 query results, but got {}",
result.len() result.len()

View File

@@ -134,7 +134,7 @@ fn try_acquire_lsn_lease(
let mut client = config.connect(NoTls)?; let mut client = config.connect(NoTls)?;
let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn); let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn);
let res = client.simple_query(&cmd)?; let res = client.simple_query(&cmd)?;
let msg = match res.first() { let msg = match res.get(1) {
Some(msg) => msg, Some(msg) => msg,
None => bail!("empty response"), None => bail!("empty response"),
}; };

View File

@@ -37,7 +37,7 @@ pub async fn ping_safekeeper(
// Parse result // Parse result
info!("done with {}", id); info!("done with {}", id);
if let postgres::SimpleQueryMessage::Row(row) = &result[0] { if let postgres::SimpleQueryMessage::Row(row) = &result[1] {
use std::str::FromStr; use std::str::FromStr;
let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse { let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse {
flush_lsn: Lsn::from_str(row.get("flush_lsn").unwrap())?, flush_lsn: Lsn::from_str(row.get("flush_lsn").unwrap())?,

View File

@@ -288,7 +288,7 @@ impl StorageController {
// But tokio-postgres fork doesn't have this upstream commit: // But tokio-postgres fork doesn't have this upstream commit:
// https://github.com/sfackler/rust-postgres/commit/cb609be758f3fb5af537f04b584a2ee0cebd5e79 // https://github.com/sfackler/rust-postgres/commit/cb609be758f3fb5af537f04b584a2ee0cebd5e79
// => we should rebase our fork => TODO https://github.com/neondatabase/neon/issues/8399 // => we should rebase our fork => TODO https://github.com/neondatabase/neon/issues/8399
.user(&username()) .user(username())
.dbname(DB_NAME) .dbname(DB_NAME)
.connect(tokio_postgres::NoTls) .connect(tokio_postgres::NoTls)
.await .await

View File

@@ -32,9 +32,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
_query_string: &str, _query_string: &str,
) -> Result<(), QueryError> { ) -> Result<(), QueryError> {
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col( pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"hey", b"column",
)]))? )]))?
.write_message_noflush(&BeMessage::DataRow(&[Some("hey".as_bytes())]))? .write_message_noflush(&BeMessage::DataRow(&[Some("data".as_bytes())]))?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
Ok(()) Ok(())
} }
@@ -64,10 +64,17 @@ async fn simple_select() {
} }
}); });
let first_val = &(client.simple_query("SELECT 42;").await.expect("select"))[0]; let resp = client.simple_query("SELECT 42;").await.expect("select");
if let SimpleQueryMessage::Row(row) = first_val { if let SimpleQueryMessage::RowDescription(desc) = &resp[0] {
let first_col = desc[0].name();
assert_eq!(first_col, "column");
} else {
panic!("expected SimpleQueryMessage::RowDescription");
}
if let SimpleQueryMessage::Row(row) = &resp[1] {
let first_col = row.get(0).expect("first column"); let first_col = row.get(0).expect("first column");
assert_eq!(first_col, "hey"); assert_eq!(first_col, "data");
} else { } else {
panic!("expected SimpleQueryMessage::Row"); panic!("expected SimpleQueryMessage::Row");
} }
@@ -140,10 +147,17 @@ async fn simple_select_ssl() {
} }
}); });
let first_val = &(client.simple_query("SELECT 42;").await.expect("select"))[0]; let resp = client.simple_query("SELECT 42;").await.expect("select");
if let SimpleQueryMessage::Row(row) = first_val { if let SimpleQueryMessage::RowDescription(desc) = &resp[0] {
let first_col = desc[0].name();
assert_eq!(first_col, "column");
} else {
panic!("expected SimpleQueryMessage::RowDescription");
}
if let SimpleQueryMessage::Row(row) = &resp[1] {
let first_col = row.get(0).expect("first column"); let first_col = row.get(0).expect("first column");
assert_eq!(first_col, "hey"); assert_eq!(first_col, "data");
} else { } else {
panic!("expected SimpleQueryMessage::Row"); panic!("expected SimpleQueryMessage::Row");
} }

View File

@@ -126,7 +126,7 @@ impl PgConnectionConfig {
// Use `tokio_postgres::Config` instead of `postgres::Config` because // Use `tokio_postgres::Config` instead of `postgres::Config` because
// the former supports more options to fiddle with later. // the former supports more options to fiddle with later.
let mut config = tokio_postgres::Config::new(); let mut config = tokio_postgres::Config::new();
config.host(&self.host().to_string()).port(self.port); config.host(self.host().to_string()).port(self.port);
if let Some(password) = &self.password { if let Some(password) = &self.password {
config.password(password); config.password(password);
} }
@@ -146,8 +146,7 @@ impl PgConnectionConfig {
// establishing a new connection. // establishing a new connection.
#[allow(unstable_name_collisions)] #[allow(unstable_name_collisions)]
config.options( config.options(
&self self.options
.options
.iter() .iter()
.map(|s| { .map(|s| {
if s.contains(['\\', ' ']) { if s.contains(['\\', ' ']) {

View File

@@ -695,7 +695,7 @@ async fn identify_system(client: &Client) -> anyhow::Result<IdentifySystem> {
// extract the row contents into an IdentifySystem struct. // extract the row contents into an IdentifySystem struct.
// written as a closure so I can use ? for Option here. // written as a closure so I can use ? for Option here.
if let Some(SimpleQueryMessage::Row(first_row)) = response.first() { if let Some(SimpleQueryMessage::Row(first_row)) = response.get(1) {
Ok(IdentifySystem { Ok(IdentifySystem {
systemid: get_parse(first_row, 0)?, systemid: get_parse(first_row, 0)?,
timeline: get_parse(first_row, 1)?, timeline: get_parse(first_row, 1)?,

View File

@@ -339,7 +339,7 @@ async fn recovery_stream(
let cfg = wal_stream_connection_config(connection_conf_args)?; let cfg = wal_stream_connection_config(connection_conf_args)?;
let mut cfg = cfg.to_tokio_postgres_config(); let mut cfg = cfg.to_tokio_postgres_config();
// It will make safekeeper give out not committed WAL (up to flush_lsn). // It will make safekeeper give out not committed WAL (up to flush_lsn).
cfg.application_name(&format!("safekeeper_{}", conf.my_id)); cfg.application_name(format!("safekeeper_{}", conf.my_id));
cfg.replication_mode(tokio_postgres::config::ReplicationMode::Physical); cfg.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
let connect_timeout = Duration::from_millis(10000); let connect_timeout = Duration::from_millis(10000);