mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 12:32:54 +00:00
Plumb gRPC addr through storage-controller
This commit is contained in:
@@ -1473,9 +1473,6 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
// fully managed by storage controller, therefore not sharded.
|
||||
(vec![(protocol, host, port)], DEFAULT_STRIPE_SIZE)
|
||||
} else {
|
||||
// TODO: plumb Pageserver gRPC ports through storage-controller.
|
||||
assert!(!args.grpc, "gRPC not supported with storage-controller yet");
|
||||
|
||||
// Look up the currently attached location of the tenant, and its striping metadata,
|
||||
// to pass these on to postgres.
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
@@ -1493,12 +1490,22 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
.await?;
|
||||
}
|
||||
|
||||
anyhow::Ok((
|
||||
PageserverProtocol::Libpq,
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Storage controller reported bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
))
|
||||
let pageserver = if args.grpc {
|
||||
(
|
||||
PageserverProtocol::Grpc,
|
||||
Host::parse(&shard.listen_grpc_addr.expect("no gRPC addr"))
|
||||
.expect("bad hostname"),
|
||||
shard.listen_grpc_port.expect("no gRPC port"),
|
||||
)
|
||||
} else {
|
||||
(
|
||||
PageserverProtocol::Libpq,
|
||||
Host::parse(&shard.listen_pg_addr).expect("bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
};
|
||||
|
||||
anyhow::Ok(pageserver)
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
@@ -1565,8 +1572,6 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
};
|
||||
vec![(protocol, host, port)]
|
||||
} else {
|
||||
// TODO: plumb gRPC ports through storage-controller.
|
||||
assert!(!args.grpc, "gRPC not supported with storage-controller yet");
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.tenant_locate(endpoint.tenant_id)
|
||||
@@ -1574,12 +1579,20 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|shard| {
|
||||
(
|
||||
PageserverProtocol::Libpq,
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Storage controller reported malformed host"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
if args.grpc {
|
||||
(
|
||||
PageserverProtocol::Grpc,
|
||||
Host::parse(&shard.listen_grpc_addr.expect("no gRPC addr"))
|
||||
.expect("bad hostname"),
|
||||
shard.listen_grpc_port.expect("no gRPC port"),
|
||||
)
|
||||
} else {
|
||||
(
|
||||
PageserverProtocol::Libpq,
|
||||
Host::parse(&shard.listen_pg_addr).expect("bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
@@ -265,6 +265,14 @@ impl PageServerNode {
|
||||
None => None,
|
||||
};
|
||||
|
||||
let mut grpc_host = None;
|
||||
let mut grpc_port = None;
|
||||
if let Some(grpc_addr) = &self.conf.listen_grpc_addr {
|
||||
let (_, port) = parse_host_port(grpc_addr).expect("Unable to parse listen_grpc_addr");
|
||||
grpc_host = Some("localhost".to_string());
|
||||
grpc_port = Some(port.unwrap_or(51051));
|
||||
}
|
||||
|
||||
// Intentionally hand-craft JSON: this acts as an implicit format compat test
|
||||
// in case the pageserver-side structure is edited, and reflects the real life
|
||||
// situation: the metadata is written by some other script.
|
||||
@@ -273,6 +281,8 @@ impl PageServerNode {
|
||||
serde_json::to_vec(&pageserver_api::config::NodeMetadata {
|
||||
postgres_host: "localhost".to_string(),
|
||||
postgres_port: self.pg_connection_config.port(),
|
||||
grpc_host,
|
||||
grpc_port,
|
||||
http_host: "localhost".to_string(),
|
||||
http_port,
|
||||
https_port,
|
||||
|
||||
@@ -37,6 +37,11 @@ enum Command {
|
||||
#[arg(long)]
|
||||
listen_pg_port: u16,
|
||||
|
||||
#[arg(long)]
|
||||
listen_grpc_addr: Option<String>,
|
||||
#[arg(long)]
|
||||
listen_grpc_port: Option<u16>,
|
||||
|
||||
#[arg(long)]
|
||||
listen_http_addr: String,
|
||||
#[arg(long)]
|
||||
@@ -410,6 +415,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
node_id,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
listen_grpc_addr,
|
||||
listen_grpc_port,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
listen_https_port,
|
||||
@@ -423,6 +430,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
node_id,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
listen_grpc_addr,
|
||||
listen_grpc_port,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
listen_https_port,
|
||||
|
||||
@@ -34,6 +34,8 @@ pub struct NodeMetadata {
|
||||
pub postgres_host: String,
|
||||
#[serde(rename = "port")]
|
||||
pub postgres_port: u16,
|
||||
pub grpc_host: Option<String>,
|
||||
pub grpc_port: Option<u16>,
|
||||
pub http_host: String,
|
||||
pub http_port: u16,
|
||||
pub https_port: Option<u16>,
|
||||
|
||||
@@ -14,6 +14,8 @@ fn test_node_metadata_v1_backward_compatibilty() {
|
||||
NodeMetadata {
|
||||
postgres_host: "localhost".to_string(),
|
||||
postgres_port: 23,
|
||||
grpc_host: None,
|
||||
grpc_port: None,
|
||||
http_host: "localhost".to_string(),
|
||||
http_port: 42,
|
||||
https_port: None,
|
||||
@@ -37,6 +39,35 @@ fn test_node_metadata_v2_backward_compatibilty() {
|
||||
NodeMetadata {
|
||||
postgres_host: "localhost".to_string(),
|
||||
postgres_port: 23,
|
||||
grpc_host: None,
|
||||
grpc_port: None,
|
||||
http_host: "localhost".to_string(),
|
||||
http_port: 42,
|
||||
https_port: Some(123),
|
||||
other: HashMap::new(),
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_node_metadata_v3_backward_compatibilty() {
|
||||
let v3 = serde_json::to_vec(&serde_json::json!({
|
||||
"host": "localhost",
|
||||
"port": 23,
|
||||
"grpc_host": "localhost",
|
||||
"grpc_port": 51,
|
||||
"http_host": "localhost",
|
||||
"http_port": 42,
|
||||
"https_port": 123,
|
||||
}));
|
||||
|
||||
assert_eq!(
|
||||
serde_json::from_slice::<NodeMetadata>(&v3.unwrap()).unwrap(),
|
||||
NodeMetadata {
|
||||
postgres_host: "localhost".to_string(),
|
||||
postgres_port: 23,
|
||||
grpc_host: Some("localhost".to_string()),
|
||||
grpc_port: Some(51),
|
||||
http_host: "localhost".to_string(),
|
||||
http_port: 42,
|
||||
https_port: Some(123),
|
||||
|
||||
@@ -53,6 +53,9 @@ pub struct NodeRegisterRequest {
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_pg_port: u16,
|
||||
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub listen_grpc_port: Option<u16>,
|
||||
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
pub listen_https_port: Option<u16>,
|
||||
@@ -102,6 +105,9 @@ pub struct TenantLocateResponseShard {
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_pg_port: u16,
|
||||
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub listen_grpc_port: Option<u16>,
|
||||
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
pub listen_https_port: Option<u16>,
|
||||
@@ -152,6 +158,8 @@ pub struct NodeDescribeResponse {
|
||||
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_pg_port: u16,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub listen_grpc_port: Option<u16>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
||||
@@ -195,6 +195,8 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
|
||||
node_id: conf.id,
|
||||
listen_pg_addr: m.postgres_host,
|
||||
listen_pg_port: m.postgres_port,
|
||||
listen_grpc_addr: m.grpc_host,
|
||||
listen_grpc_port: m.grpc_port,
|
||||
listen_http_addr: m.http_host,
|
||||
listen_http_port: m.http_port,
|
||||
listen_https_port: m.https_port,
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE nodes DROP listen_grpc_addr, listen_grpc_port;
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE nodes ADD listen_grpc_addr VARCHAR NULL, ADD listen_grpc_port INTEGER NULL;
|
||||
@@ -37,6 +37,9 @@ pub(crate) struct Node {
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
|
||||
listen_grpc_addr: Option<String>,
|
||||
listen_grpc_port: Option<u16>,
|
||||
|
||||
availability_zone_id: AvailabilityZone,
|
||||
|
||||
// Flag from storcon's config to use https for pageserver admin API.
|
||||
@@ -99,8 +102,8 @@ impl Node {
|
||||
self.id == register_req.node_id
|
||||
&& self.listen_http_addr == register_req.listen_http_addr
|
||||
&& self.listen_http_port == register_req.listen_http_port
|
||||
// Note: listen_https_port may change. See [`Self::need_update`] for mode details.
|
||||
// && self.listen_https_port == register_req.listen_https_port
|
||||
// Note: HTTPS and gRPC ports/addresses may change, to allow for migrations. See
|
||||
// [`Self::need_update`] for more details.
|
||||
&& self.listen_pg_addr == register_req.listen_pg_addr
|
||||
&& self.listen_pg_port == register_req.listen_pg_port
|
||||
&& self.availability_zone_id == register_req.availability_zone_id
|
||||
@@ -108,9 +111,10 @@ impl Node {
|
||||
|
||||
// Do we need to update an existing record in DB on this registration request?
|
||||
pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool {
|
||||
// listen_https_port is checked here because it may change during migration to https.
|
||||
// After migration, this check may be moved to registration_match.
|
||||
// These are checked here, since they may change before we're fully migrated.
|
||||
self.listen_https_port != register_req.listen_https_port
|
||||
|| self.listen_grpc_addr != register_req.listen_grpc_addr
|
||||
|| self.listen_grpc_port != register_req.listen_grpc_port
|
||||
}
|
||||
|
||||
/// For a shard located on this node, populate a response object
|
||||
@@ -124,6 +128,8 @@ impl Node {
|
||||
listen_https_port: self.listen_https_port,
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port,
|
||||
listen_grpc_addr: self.listen_grpc_addr.clone(),
|
||||
listen_grpc_port: self.listen_grpc_port,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,6 +216,8 @@ impl Node {
|
||||
listen_https_port: Option<u16>,
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
listen_grpc_addr: Option<String>,
|
||||
listen_grpc_port: Option<u16>,
|
||||
availability_zone_id: AvailabilityZone,
|
||||
use_https: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
@@ -227,6 +235,8 @@ impl Node {
|
||||
listen_https_port,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
listen_grpc_addr,
|
||||
listen_grpc_port,
|
||||
scheduling: NodeSchedulingPolicy::Active,
|
||||
availability: NodeAvailability::Offline,
|
||||
availability_zone_id,
|
||||
@@ -244,6 +254,8 @@ impl Node {
|
||||
listen_https_port: self.listen_https_port.map(|x| x as i32),
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port as i32,
|
||||
listen_grpc_addr: self.listen_grpc_addr.clone(),
|
||||
listen_grpc_port: self.listen_grpc_port.map(|x| x as i32),
|
||||
availability_zone_id: self.availability_zone_id.0.clone(),
|
||||
}
|
||||
}
|
||||
@@ -268,6 +280,8 @@ impl Node {
|
||||
listen_https_port: np.listen_https_port.map(|x| x as u16),
|
||||
listen_pg_addr: np.listen_pg_addr,
|
||||
listen_pg_port: np.listen_pg_port as u16,
|
||||
listen_grpc_addr: np.listen_grpc_addr,
|
||||
listen_grpc_port: np.listen_grpc_port.map(|x| x as u16),
|
||||
availability_zone_id: AvailabilityZone(np.availability_zone_id),
|
||||
use_https,
|
||||
cancel: CancellationToken::new(),
|
||||
@@ -357,6 +371,8 @@ impl Node {
|
||||
listen_https_port: self.listen_https_port,
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port,
|
||||
listen_grpc_addr: self.listen_grpc_addr.clone(),
|
||||
listen_grpc_port: self.listen_grpc_port,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2048,6 +2048,8 @@ pub(crate) struct NodePersistence {
|
||||
pub(crate) listen_pg_port: i32,
|
||||
pub(crate) availability_zone_id: String,
|
||||
pub(crate) listen_https_port: Option<i32>,
|
||||
pub(crate) listen_grpc_addr: Option<String>,
|
||||
pub(crate) listen_grpc_port: Option<i32>,
|
||||
}
|
||||
|
||||
/// Tenant metadata health status that are stored durably.
|
||||
|
||||
@@ -945,6 +945,8 @@ pub(crate) mod test_utils {
|
||||
None,
|
||||
format!("pghost-{i}"),
|
||||
5432 + i as u16,
|
||||
None,
|
||||
None,
|
||||
az_iter
|
||||
.next()
|
||||
.cloned()
|
||||
|
||||
@@ -33,6 +33,8 @@ diesel::table! {
|
||||
listen_pg_port -> Int4,
|
||||
availability_zone_id -> Varchar,
|
||||
listen_https_port -> Nullable<Int4>,
|
||||
listen_grpc_addr -> Nullable<Varchar>,
|
||||
listen_grpc_port -> Nullable<Int4>,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1681,6 +1681,8 @@ impl Service {
|
||||
None,
|
||||
"".to_string(),
|
||||
123,
|
||||
None,
|
||||
None,
|
||||
AvailabilityZone("test_az".to_string()),
|
||||
false,
|
||||
)
|
||||
@@ -7215,6 +7217,8 @@ impl Service {
|
||||
register_req.listen_https_port,
|
||||
register_req.listen_pg_addr,
|
||||
register_req.listen_pg_port,
|
||||
register_req.listen_grpc_addr,
|
||||
register_req.listen_grpc_port,
|
||||
register_req.availability_zone_id.clone(),
|
||||
self.config.use_https_pageserver_api,
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user