Compare commits

...

4 Commits

Author SHA1 Message Date
Conrad Ludgate
18370b1e25 one more simple_query 2024-12-04 09:42:15 +00:00
Conrad Ludgate
209c720142 fix other uses of simple_query 2024-12-04 09:42:15 +00:00
Conrad Ludgate
c1d220ad4a fix 2024-12-04 09:42:15 +00:00
Conrad Ludgate
b23fd32f29 chore: update rust-postgres fork 2024-12-04 09:42:15 +00:00
10 changed files with 47 additions and 32 deletions

21
Cargo.lock generated
View File

@@ -4170,8 +4170,8 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
version = "0.19.9"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon-rebase1#a88729d425754cb64423aa7140caac9f1cc9bb2a"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4183,10 +4183,10 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
version = "0.6.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon-rebase1#a88729d425754cb64423aa7140caac9f1cc9bb2a"
dependencies = [
"base64 0.20.0",
"base64 0.22.1",
"byteorder",
"bytes",
"fallible-iterator",
@@ -4197,7 +4197,6 @@ dependencies = [
"rand 0.8.5",
"sha2",
"stringprep",
"tokio",
]
[[package]]
@@ -4218,8 +4217,8 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
version = "0.2.8"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon-rebase1#a88729d425754cb64423aa7140caac9f1cc9bb2a"
dependencies = [
"bytes",
"fallible-iterator",
@@ -6542,8 +6541,8 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
version = "0.7.12"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon-rebase1#a88729d425754cb64423aa7140caac9f1cc9bb2a"
dependencies = [
"async-trait",
"byteorder",
@@ -6558,9 +6557,11 @@ dependencies = [
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"rand 0.8.5",
"socket2",
"tokio",
"tokio-util",
"whoami",
]
[[package]]

View File

@@ -211,10 +211,10 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon-rebase1" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon-rebase1" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon-rebase1" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon-rebase1" }
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
@@ -254,7 +254,7 @@ tonic-build = "0.12"
[patch.crates-io]
# Needed to get `tokio-postgres-rustls` to depend on our fork.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon-rebase1" }
################# Binary contents sections

View File

@@ -30,7 +30,8 @@ pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
match client.simple_query(query).await {
Result::Ok(result) => {
if result.len() != 1 {
// one extra item for row description
if result.len() != 2 {
return Err(anyhow::anyhow!(
"expected 1 query results, but got {}",
result.len()

View File

@@ -134,7 +134,7 @@ fn try_acquire_lsn_lease(
let mut client = config.connect(NoTls)?;
let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn);
let res = client.simple_query(&cmd)?;
let msg = match res.first() {
let msg = match res.get(1) {
Some(msg) => msg,
None => bail!("empty response"),
};

View File

@@ -37,7 +37,7 @@ pub async fn ping_safekeeper(
// Parse result
info!("done with {}", id);
if let postgres::SimpleQueryMessage::Row(row) = &result[0] {
if let postgres::SimpleQueryMessage::Row(row) = &result[1] {
use std::str::FromStr;
let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse {
flush_lsn: Lsn::from_str(row.get("flush_lsn").unwrap())?,

View File

@@ -288,7 +288,7 @@ impl StorageController {
// But tokio-postgres fork doesn't have this upstream commit:
// https://github.com/sfackler/rust-postgres/commit/cb609be758f3fb5af537f04b584a2ee0cebd5e79
// => we should rebase our fork => TODO https://github.com/neondatabase/neon/issues/8399
.user(&username())
.user(username())
.dbname(DB_NAME)
.connect(tokio_postgres::NoTls)
.await

View File

@@ -32,9 +32,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
_query_string: &str,
) -> Result<(), QueryError> {
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"hey",
b"column",
)]))?
.write_message_noflush(&BeMessage::DataRow(&[Some("hey".as_bytes())]))?
.write_message_noflush(&BeMessage::DataRow(&[Some("data".as_bytes())]))?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
Ok(())
}
@@ -64,10 +64,17 @@ async fn simple_select() {
}
});
let first_val = &(client.simple_query("SELECT 42;").await.expect("select"))[0];
if let SimpleQueryMessage::Row(row) = first_val {
let resp = client.simple_query("SELECT 42;").await.expect("select");
if let SimpleQueryMessage::RowDescription(desc) = &resp[0] {
let first_col = desc[0].name();
assert_eq!(first_col, "column");
} else {
panic!("expected SimpleQueryMessage::RowDescription");
}
if let SimpleQueryMessage::Row(row) = &resp[1] {
let first_col = row.get(0).expect("first column");
assert_eq!(first_col, "hey");
assert_eq!(first_col, "data");
} else {
panic!("expected SimpleQueryMessage::Row");
}
@@ -140,10 +147,17 @@ async fn simple_select_ssl() {
}
});
let first_val = &(client.simple_query("SELECT 42;").await.expect("select"))[0];
if let SimpleQueryMessage::Row(row) = first_val {
let resp = client.simple_query("SELECT 42;").await.expect("select");
if let SimpleQueryMessage::RowDescription(desc) = &resp[0] {
let first_col = desc[0].name();
assert_eq!(first_col, "column");
} else {
panic!("expected SimpleQueryMessage::RowDescription");
}
if let SimpleQueryMessage::Row(row) = &resp[1] {
let first_col = row.get(0).expect("first column");
assert_eq!(first_col, "hey");
assert_eq!(first_col, "data");
} else {
panic!("expected SimpleQueryMessage::Row");
}

View File

@@ -126,7 +126,7 @@ impl PgConnectionConfig {
// Use `tokio_postgres::Config` instead of `postgres::Config` because
// the former supports more options to fiddle with later.
let mut config = tokio_postgres::Config::new();
config.host(&self.host().to_string()).port(self.port);
config.host(self.host().to_string()).port(self.port);
if let Some(password) = &self.password {
config.password(password);
}
@@ -146,8 +146,7 @@ impl PgConnectionConfig {
// establishing a new connection.
#[allow(unstable_name_collisions)]
config.options(
&self
.options
self.options
.iter()
.map(|s| {
if s.contains(['\\', ' ']) {

View File

@@ -695,7 +695,7 @@ async fn identify_system(client: &Client) -> anyhow::Result<IdentifySystem> {
// extract the row contents into an IdentifySystem struct.
// written as a closure so I can use ? for Option here.
if let Some(SimpleQueryMessage::Row(first_row)) = response.first() {
if let Some(SimpleQueryMessage::Row(first_row)) = response.get(1) {
Ok(IdentifySystem {
systemid: get_parse(first_row, 0)?,
timeline: get_parse(first_row, 1)?,

View File

@@ -339,7 +339,7 @@ async fn recovery_stream(
let cfg = wal_stream_connection_config(connection_conf_args)?;
let mut cfg = cfg.to_tokio_postgres_config();
// It will make safekeeper give out not committed WAL (up to flush_lsn).
cfg.application_name(&format!("safekeeper_{}", conf.my_id));
cfg.application_name(format!("safekeeper_{}", conf.my_id));
cfg.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
let connect_timeout = Duration::from_millis(10000);