move walreceiver module under timeline

Walreceiver is a per-timeline abstraction. Move it there to reflect
the hierarchy of abstractions and task_mgr tasks.
The code that sets up the global storage_broker client
is not timeline-scoped. So, break it out into a separate module.

The motivation for this change is to prepare the code base for replacing
the task_mgr global task registry with a more ownership-oriented
approach to manage task lifetimes.

I removed TaskStateUpdate::Init because, after doing the changes,
rustc warned that it was never constructed.
A quick search through the commit history shows that this
has always been true since

    commit fb68d01449
    Author: Dmitry Rodionov <dmitry@neon.tech>
    Date:   Mon Sep 26 23:57:02 2022 +0300

        Preserve task result in TaskHandle by keeping join handle around (#2521)

So, the warning is not an indication of some accidental code removal.

This is PR: https://github.com/neondatabase/neon/pull/3456
This commit is contained in:
Christian Schwarz
2023-01-24 19:00:18 +01:00
committed by Christian Schwarz
parent 2388981311
commit 99399c112a
7 changed files with 61 additions and 55 deletions

View File

@@ -250,7 +250,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
let signals = signals::install_shutdown_handlers()?;
// Launch broker client
WALRECEIVER_RUNTIME.block_on(pageserver::walreceiver::init_broker_client(conf))?;
WALRECEIVER_RUNTIME.block_on(pageserver::broker_client::init_broker_client(conf))?;
// Initialize authentication for incoming connections
let auth = match &conf.auth_type {

View File

@@ -0,0 +1,48 @@
//! The broker client instance of the pageserver, created during pageserver startup.
//! Used by each timelines' [`walreceiver`].
use crate::config::PageServerConf;
use anyhow::Context;
use once_cell::sync::OnceCell;
use storage_broker::BrokerClientChannel;
use tracing::*;
static BROKER_CLIENT: OnceCell<BrokerClientChannel> = OnceCell::new();
///
/// Initialize the broker client. This must be called once at page server startup.
///
pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
let broker_endpoint = conf.broker_endpoint.clone();
// Note: we do not attempt connecting here (but validate endpoints sanity).
let broker_client =
storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context(
format!(
"Failed to create broker client to {}",
&conf.broker_endpoint
),
)?;
if BROKER_CLIENT.set(broker_client).is_err() {
panic!("broker already initialized");
}
info!(
"Initialized broker client with endpoints: {}",
broker_endpoint
);
Ok(())
}
///
/// Get a handle to the broker client
///
pub fn get_broker_client() -> &'static BrokerClientChannel {
BROKER_CLIENT.get().expect("broker client not initialized")
}
pub fn is_broker_client_initialized() -> bool {
BROKER_CLIENT.get().is_some()
}

View File

@@ -1,5 +1,6 @@
mod auth;
pub mod basebackup;
pub mod broker_client;
pub mod config;
pub mod consumption_metrics;
pub mod context;
@@ -16,7 +17,6 @@ pub mod tenant;
pub mod trace;
pub mod virtual_file;
pub mod walingest;
pub mod walreceiver;
pub mod walrecord;
pub mod walredo;

View File

@@ -1,5 +1,7 @@
//!
mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context};
use bytes::Bytes;
use fail::fail_point;
@@ -23,6 +25,7 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::broker_client::is_broker_client_initialized;
use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
@@ -59,11 +62,11 @@ use crate::page_cache;
use crate::repository::GcResult;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walreceiver::{is_broker_client_initialized, spawn_connection_manager_task};
use crate::walredo::WalRedoManager;
use crate::METADATA_FILE_NAME;
use crate::ZERO_PAGE;
use crate::{is_temporary, task_mgr};
use walreceiver::spawn_connection_manager_task;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;

View File

@@ -23,58 +23,15 @@
mod connection_manager;
mod walreceiver_connection;
use crate::config::PageServerConf;
use crate::task_mgr::WALRECEIVER_RUNTIME;
use anyhow::Context;
use once_cell::sync::OnceCell;
use std::future::Future;
use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::*;
pub use connection_manager::spawn_connection_manager_task;
static BROKER_CLIENT: OnceCell<BrokerClientChannel> = OnceCell::new();
///
/// Initialize the broker client. This must be called once at page server startup.
///
pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
let broker_endpoint = conf.broker_endpoint.clone();
// Note: we do not attempt connecting here (but validate endpoints sanity).
let broker_client =
storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context(
format!(
"Failed to create broker client to {}",
&conf.broker_endpoint
),
)?;
if BROKER_CLIENT.set(broker_client).is_err() {
panic!("broker already initialized");
}
info!(
"Initialized broker client with endpoints: {}",
broker_endpoint
);
Ok(())
}
///
/// Get a handle to the broker client
///
pub fn get_broker_client() -> &'static BrokerClientChannel {
BROKER_CLIENT.get().expect("broker client not initialized")
}
pub fn is_broker_client_initialized() -> bool {
BROKER_CLIENT.get().is_some()
}
/// A handle of an asynchronous task.
/// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`]
/// and a cancellation token that it can listen to for earlier interrupts.
@@ -95,7 +52,6 @@ pub enum TaskEvent<E> {
#[derive(Debug, Clone)]
pub enum TaskStateUpdate<E> {
Init,
Started,
Progress(E),
}

View File

@@ -11,11 +11,12 @@
use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
use super::TaskStateUpdate;
use crate::broker_client::get_broker_client;
use crate::context::RequestContext;
use crate::task_mgr::TaskKind;
use crate::task_mgr::WALRECEIVER_RUNTIME;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::Timeline;
use crate::{task_mgr, walreceiver::TaskStateUpdate};
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use pageserver_api::models::TimelineState;
@@ -28,10 +29,7 @@ use storage_broker::Streaming;
use tokio::{select, sync::watch};
use tracing::*;
use crate::{
exponential_backoff, walreceiver::get_broker_client, DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
};
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
use postgres_connection::{parse_host_port, PgConnectionConfig};
use utils::{
id::{NodeId, TenantTimelineId},
@@ -149,7 +147,7 @@ async fn connection_manager_loop_step(
let wal_connection = walreceiver_state.wal_connection.as_mut()
.expect("Should have a connection, as checked by the corresponding select! guard");
match wal_connection_update {
TaskEvent::Update(TaskStateUpdate::Init | TaskStateUpdate::Started) => {},
TaskEvent::Update(TaskStateUpdate::Started) => {},
TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
if new_status.has_processed_wal {
// We have advanced last_record_lsn by processing the WAL received

View File

@@ -22,8 +22,9 @@ use tokio_postgres::{replication::ReplicationStream, Client};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
use super::TaskStateUpdate;
use crate::context::RequestContext;
use crate::{metrics::LIVE_CONNECTIONS_COUNT, walreceiver::TaskStateUpdate};
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::{
task_mgr,
task_mgr::TaskKind,