mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-11 15:32:56 +00:00
Remove wal_receiver mgmt API endpoint
Move all the fields that were returned by the wal_receiver endpoint into
timeline_detail. Internally, move those fields from the separate global
WAL_RECEIVERS hash into the LayeredTimeline struct. That way, all the
information about a timeline is kept in one place.
In the passing, I noted that the 'thread_id' field was removed from
WalReceiverEntry in commit e5cb727572, but it forgot to update
openapi_spec.yml. This commit removes that too.
This commit is contained in:
@@ -101,18 +101,6 @@ impl TenantConfigRequest {
|
||||
}
|
||||
}
|
||||
|
||||
/// A WAL receiver's data stored inside the global `WAL_RECEIVERS`.
|
||||
/// We keep one WAL receiver active per timeline.
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct WalReceiverEntry {
|
||||
pub wal_source_connstr: Option<String>,
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub last_received_msg_lsn: Option<Lsn>,
|
||||
/// the timestamp (in microseconds) of the last received message
|
||||
pub last_received_msg_ts: Option<u128>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TenantInfo {
|
||||
@@ -143,6 +131,12 @@ pub struct LocalTimelineInfo {
|
||||
pub current_logical_size_non_incremental: Option<usize>,
|
||||
pub current_physical_size_non_incremental: Option<u64>,
|
||||
pub timeline_state: LocalTimelineState,
|
||||
|
||||
pub wal_source_connstr: Option<String>,
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub last_received_msg_lsn: Option<Lsn>,
|
||||
/// the timestamp (in microseconds) of the last received message
|
||||
pub last_received_msg_ts: Option<u128>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
|
||||
@@ -207,54 +207,6 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}/wal_receiver:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
- name: timeline_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
get:
|
||||
description: Get wal receiver's data attached to the timeline
|
||||
responses:
|
||||
"200":
|
||||
description: WalReceiverEntry
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/WalReceiverEntry"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"404":
|
||||
description: Error when no wal receiver is running or found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotFoundError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/attach:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -689,15 +641,6 @@ components:
|
||||
type: integer
|
||||
current_physical_size_non_incremental:
|
||||
type: integer
|
||||
|
||||
WalReceiverEntry:
|
||||
type: object
|
||||
required:
|
||||
- thread_id
|
||||
- wal_source_connstr
|
||||
properties:
|
||||
thread_id:
|
||||
type: integer
|
||||
wal_source_connstr:
|
||||
type: string
|
||||
last_received_msg_lsn:
|
||||
|
||||
@@ -91,6 +91,19 @@ fn local_timeline_info_from_loaded_timeline(
|
||||
include_non_incremental_physical_size: bool,
|
||||
) -> anyhow::Result<LocalTimelineInfo> {
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
|
||||
let guard = timeline.last_received_wal.lock().unwrap();
|
||||
if let Some(info) = guard.as_ref() {
|
||||
(
|
||||
Some(info.wal_source_connstr.clone()),
|
||||
Some(info.last_received_msg_lsn),
|
||||
Some(info.last_received_msg_ts),
|
||||
)
|
||||
} else {
|
||||
(None, None, None)
|
||||
}
|
||||
};
|
||||
|
||||
let info = LocalTimelineInfo {
|
||||
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
|
||||
ancestor_lsn: {
|
||||
@@ -116,6 +129,9 @@ fn local_timeline_info_from_loaded_timeline(
|
||||
} else {
|
||||
None
|
||||
},
|
||||
wal_source_connstr,
|
||||
last_received_msg_lsn,
|
||||
last_received_msg_ts,
|
||||
};
|
||||
Ok(info)
|
||||
}
|
||||
@@ -138,6 +154,9 @@ fn local_timeline_info_from_unloaded_timeline(metadata: &TimelineMetadata) -> Lo
|
||||
current_physical_size: None,
|
||||
current_logical_size_non_incremental: None,
|
||||
current_physical_size_non_incremental: None,
|
||||
wal_source_connstr: None,
|
||||
last_received_msg_lsn: None,
|
||||
last_received_msg_ts: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -348,23 +367,6 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
json_response(StatusCode::OK, timeline_info)
|
||||
}
|
||||
|
||||
async fn wal_receiver_get_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let wal_receiver_entry = crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id)
|
||||
.instrument(info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id))
|
||||
.await
|
||||
.ok_or_else(|| {
|
||||
ApiError::NotFound(format!(
|
||||
"WAL receiver data not found for tenant {tenant_id} and timeline {timeline_id}"
|
||||
))
|
||||
})?;
|
||||
|
||||
json_response(StatusCode::OK, &wal_receiver_entry)
|
||||
}
|
||||
|
||||
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
|
||||
async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
|
||||
@@ -751,9 +753,5 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/detach",
|
||||
timeline_delete_handler,
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/wal_receiver",
|
||||
wal_receiver_get_handler,
|
||||
)
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -67,6 +67,9 @@ pub use crate::layered_repository::ephemeral_file::writeback as writeback_epheme
|
||||
// re-export for use in storage_sync.rs
|
||||
pub use crate::layered_repository::timeline::save_metadata;
|
||||
|
||||
// re-export for use in walreceiver
|
||||
pub use crate::layered_repository::timeline::WalReceiverInfo;
|
||||
|
||||
/// Parts of the `.neon/tenants/<tenantid>/timelines/<timelineid>` directory prefix.
|
||||
pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
|
||||
|
||||
|
||||
@@ -290,6 +290,17 @@ pub struct LayeredTimeline {
|
||||
|
||||
/// Current logical size of the "datadir", at the last LSN.
|
||||
current_logical_size: AtomicIsize,
|
||||
|
||||
/// Information about the last processed message by the WAL receiver,
|
||||
/// or None if WAL receiver has not received anything for this timeline
|
||||
/// yet.
|
||||
pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
|
||||
}
|
||||
|
||||
pub struct WalReceiverInfo {
|
||||
pub wal_source_connstr: String,
|
||||
pub last_received_msg_lsn: Lsn,
|
||||
pub last_received_msg_ts: u128,
|
||||
}
|
||||
|
||||
/// Inherit all the functions from DatadirTimeline, to provide the
|
||||
@@ -605,6 +616,8 @@ impl LayeredTimeline {
|
||||
current_logical_size: AtomicIsize::new(0),
|
||||
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
|
||||
repartition_threshold: 0,
|
||||
|
||||
last_received_wal: Mutex::new(None),
|
||||
};
|
||||
result.repartition_threshold = result.get_checkpoint_distance() / 10;
|
||||
result
|
||||
|
||||
@@ -26,7 +26,6 @@ mod walreceiver_connection;
|
||||
use anyhow::{ensure, Context};
|
||||
use etcd_broker::Client;
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::Lazy;
|
||||
use std::cell::Cell;
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::future::Future;
|
||||
@@ -36,14 +35,13 @@ use std::thread_local;
|
||||
use std::time::Duration;
|
||||
use tokio::{
|
||||
select,
|
||||
sync::{mpsc, watch, RwLock},
|
||||
sync::{mpsc, watch},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tracing::*;
|
||||
use url::Url;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::http::models::WalReceiverEntry;
|
||||
use crate::tenant_mgr::{self, LocalTimelineUpdate, TenantState};
|
||||
use crate::thread_mgr::{self, ThreadKind};
|
||||
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
|
||||
@@ -55,23 +53,6 @@ thread_local! {
|
||||
pub(crate) static IS_WAL_RECEIVER: Cell<bool> = Cell::new(false);
|
||||
}
|
||||
|
||||
/// WAL receiver state for sharing with the outside world.
|
||||
/// Only entries for timelines currently available in pageserver are stored.
|
||||
static WAL_RECEIVER_ENTRIES: Lazy<RwLock<HashMap<ZTenantTimelineId, WalReceiverEntry>>> =
|
||||
Lazy::new(|| RwLock::new(HashMap::new()));
|
||||
|
||||
/// Gets the public WAL streaming entry for a certain timeline.
|
||||
pub async fn get_wal_receiver_entry(
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
) -> Option<WalReceiverEntry> {
|
||||
WAL_RECEIVER_ENTRIES
|
||||
.read()
|
||||
.await
|
||||
.get(&ZTenantTimelineId::new(tenant_id, timeline_id))
|
||||
.cloned()
|
||||
}
|
||||
|
||||
/// Sets up the main WAL receiver thread that manages the rest of the subtasks inside of it, per timeline.
|
||||
/// See comments in [`wal_receiver_main_thread_loop_step`] for more details on per timeline activities.
|
||||
pub fn init_wal_receiver_main_thread(
|
||||
@@ -281,13 +262,10 @@ async fn wal_receiver_main_thread_loop_step<'a>(
|
||||
}
|
||||
None => warn!("Timeline {id} does not have a tenant entry in wal receiver main thread"),
|
||||
};
|
||||
{
|
||||
WAL_RECEIVER_ENTRIES.write().await.remove(&id);
|
||||
if let Err(e) = join_confirmation_sender.send(()) {
|
||||
warn!("cannot send wal_receiver shutdown confirmation {e}")
|
||||
} else {
|
||||
info!("confirm walreceiver shutdown for {id}");
|
||||
}
|
||||
if let Err(e) = join_confirmation_sender.send(()) {
|
||||
warn!("cannot send wal_receiver shutdown confirmation {e}")
|
||||
} else {
|
||||
info!("confirm walreceiver shutdown for {id}");
|
||||
}
|
||||
}
|
||||
// Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly.
|
||||
@@ -322,17 +300,6 @@ async fn wal_receiver_main_thread_loop_step<'a>(
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
WAL_RECEIVER_ENTRIES.write().await.insert(
|
||||
id,
|
||||
WalReceiverEntry {
|
||||
wal_source_connstr: None,
|
||||
last_received_msg_lsn: None,
|
||||
last_received_msg_ts: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
vacant_connection_manager_entry.insert(
|
||||
connection_manager::spawn_connection_manager_task(
|
||||
id,
|
||||
|
||||
@@ -19,7 +19,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument};
|
||||
|
||||
use super::TaskEvent;
|
||||
use crate::{
|
||||
http::models::WalReceiverEntry,
|
||||
layered_repository::WalReceiverInfo,
|
||||
pgdatadir_mapping::DatadirTimeline,
|
||||
repository::{Repository, Timeline},
|
||||
tenant_mgr,
|
||||
@@ -232,21 +232,16 @@ pub async fn handle_walreceiver_connection(
|
||||
let apply_lsn = u64::from(timeline_remote_consistent_lsn);
|
||||
let ts = SystemTime::now();
|
||||
|
||||
// Update the current WAL receiver's data stored inside the global hash table `WAL_RECEIVERS`
|
||||
{
|
||||
super::WAL_RECEIVER_ENTRIES.write().await.insert(
|
||||
id,
|
||||
WalReceiverEntry {
|
||||
wal_source_connstr: Some(wal_source_connstr.to_owned()),
|
||||
last_received_msg_lsn: Some(last_lsn),
|
||||
last_received_msg_ts: Some(
|
||||
ts.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("Received message time should be before UNIX EPOCH!")
|
||||
.as_micros(),
|
||||
),
|
||||
},
|
||||
);
|
||||
}
|
||||
// Update the status about what we just received. This is shown in the mgmt API.
|
||||
let last_received_wal = WalReceiverInfo {
|
||||
wal_source_connstr: wal_source_connstr.to_owned(),
|
||||
last_received_msg_lsn: last_lsn,
|
||||
last_received_msg_ts: ts
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("Received message time should be before UNIX EPOCH!")
|
||||
.as_micros(),
|
||||
};
|
||||
*timeline.last_received_wal.lock().unwrap() = Some(last_received_wal);
|
||||
|
||||
// Send zenith feedback message.
|
||||
// Regular standby_status_update fields are put into this message.
|
||||
|
||||
@@ -47,7 +47,8 @@ def check_client(client: NeonPageserverHttpClient, initial_tenant: UUID):
|
||||
for timeline in timelines:
|
||||
timeline_id_str = str(timeline['timeline_id'])
|
||||
timeline_details = client.timeline_detail(tenant_id=tenant_id,
|
||||
timeline_id=UUID(timeline_id_str))
|
||||
timeline_id=UUID(timeline_id_str),
|
||||
include_non_incremental_logical_size=True)
|
||||
|
||||
assert timeline_details['tenant_id'] == tenant_id.hex
|
||||
assert timeline_details['timeline_id'] == timeline_id_str
|
||||
@@ -63,13 +64,19 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv):
|
||||
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
empty_response = client.wal_receiver_get(tenant_id, timeline_id)
|
||||
timeline_details = client.timeline_detail(tenant_id=tenant_id,
|
||||
timeline_id=timeline_id,
|
||||
include_non_incremental_logical_size=True)
|
||||
|
||||
assert empty_response.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||
assert empty_response.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||
assert empty_response.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||
assert timeline_details.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||
assert timeline_details.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||
assert timeline_details.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||
|
||||
|
||||
# Test the WAL-receiver related fields in the response to `timeline_details` API call
|
||||
#
|
||||
# These fields used to be returned by a separate API call, but they're part of
|
||||
# `timeline_details` now.
|
||||
def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
client = env.pageserver.http_client()
|
||||
@@ -78,18 +85,17 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
|
||||
pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
|
||||
|
||||
def expect_updated_msg_lsn(prev_msg_lsn: Optional[int]) -> int:
|
||||
res = client.wal_receiver_get(tenant_id, timeline_id)
|
||||
timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id)
|
||||
|
||||
# a successful `wal_receiver_get` response must contain the below fields
|
||||
assert list(res.keys()) == [
|
||||
"wal_source_connstr",
|
||||
"last_received_msg_lsn",
|
||||
"last_received_msg_ts",
|
||||
]
|
||||
# a successful `timeline_details` response must contain the below fields
|
||||
local_timeline_details = timeline_details['local']
|
||||
assert "wal_source_connstr" in local_timeline_details.keys()
|
||||
assert "last_received_msg_lsn" in local_timeline_details.keys()
|
||||
assert "last_received_msg_ts" in local_timeline_details.keys()
|
||||
|
||||
assert res["last_received_msg_lsn"] is not None, "the last received message's LSN is empty"
|
||||
assert local_timeline_details["last_received_msg_lsn"] is not None, "the last received message's LSN is empty"
|
||||
|
||||
last_msg_lsn = lsn_from_hex(res["last_received_msg_lsn"])
|
||||
last_msg_lsn = lsn_from_hex(local_timeline_details["last_received_msg_lsn"])
|
||||
assert prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn, \
|
||||
f"the last received message's LSN {last_msg_lsn} hasn't been updated \
|
||||
compared to the previous message's LSN {prev_msg_lsn}"
|
||||
@@ -98,7 +104,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
|
||||
|
||||
# Wait to make sure that we get a latest WAL receiver data.
|
||||
# We need to wait here because it's possible that we don't have access to
|
||||
# the latest WAL during the time the `wal_receiver_get` API is called.
|
||||
# the latest WAL yet, when the `timeline_detail` API is first called.
|
||||
# See: https://github.com/neondatabase/neon/issues/1768.
|
||||
lsn = wait_until(number_of_iterations=5, interval=1, func=lambda: expect_updated_msg_lsn(None))
|
||||
|
||||
|
||||
@@ -865,10 +865,24 @@ class NeonPageserverHttpClient(requests.Session):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
|
||||
def timeline_detail(self,
|
||||
tenant_id: uuid.UUID,
|
||||
timeline_id: uuid.UUID,
|
||||
include_non_incremental_logical_size: bool = False,
|
||||
include_non_incremental_physical_size: bool = False) -> Dict[Any, Any]:
|
||||
|
||||
include_non_incremental_logical_size_str = "0"
|
||||
if include_non_incremental_logical_size:
|
||||
include_non_incremental_logical_size_str = "1"
|
||||
|
||||
include_non_incremental_physical_size_str = "0"
|
||||
if include_non_incremental_physical_size:
|
||||
include_non_incremental_physical_size_str = "1"
|
||||
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}" +
|
||||
"?include-non-incremental-logical-size=1&include-non-incremental-physical-size=1")
|
||||
"?include-non-incremental-logical-size={include_non_incremental_logical_size_str}" +
|
||||
"&include-non-incremental-physical-size={include_non_incremental_physical_size_str}")
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
@@ -882,15 +896,6 @@ class NeonPageserverHttpClient(requests.Session):
|
||||
assert res_json is None
|
||||
return res_json
|
||||
|
||||
def wal_receiver_get(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}/wal_receiver"
|
||||
)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def get_metrics(self) -> str:
|
||||
res = self.get(f"http://localhost:{self.port}/metrics")
|
||||
self.verbose_error(res)
|
||||
|
||||
Reference in New Issue
Block a user