Introduce first version of tenant migraiton between pageservers

This patch includes attach/detach http endpoints in pageservers. Some
changes in callmemaybe handling inside safekeeper and an integrational
test to check migration with and without load. There are still some
rough edges that will be addressed in follow up patches
This commit is contained in:
Dmitry Rodionov
2022-01-13 01:48:06 +03:00
committed by Dmitry Rodionov
parent 81e94d1897
commit 37c440c5d3
17 changed files with 543 additions and 82 deletions

View File

@@ -0,0 +1,22 @@
## Pageserver tenant migration
### Overview
This feature allows to migrate a timeline from one pageserver to another by utilizing remote storage capability.
### Migration process
Pageserver implements two new http handlers: timeline attach and timeline detach.
Timeline migration is performed in a following way:
1. Timeline attach is called on a target pageserver. This asks pageserver to download latest checkpoint uploaded to s3.
2. For now it is necessary to manually initialize replication stream via callmemaybe call so target pageserver initializes replication from safekeeper (it is desired to avoid this and initialize replication directly in attach handler, but this requires some refactoring (probably [#997](https://github.com/zenithdb/zenith/issues/997)/[#1049](https://github.com/zenithdb/zenith/issues/1049))
3. Replication state can be tracked via timeline detail pageserver call.
4. Compute node should be restarted with new pageserver connection string. Issue with multiple compute nodes for one timeline is handled on the safekeeper consensus level. So this is not a problem here.Currently responsibility for rescheduling the compute with updated config lies on external coordinator (console).
5. Timeline is detached from old pageserver. On disk data is removed.
### Implementation details
Now safekeeper needs to track which pageserver it is replicating to. This introduces complications into replication code:
* We need to distinguish different pageservers (now this is done by connection string which is imperfect and is covered here: https://github.com/zenithdb/zenith/issues/1105). Callmemaybe subscription management also needs to track that (this is already implemented).
* We need to track which pageserver is the primary. This is needed to avoid reconnections to non primary pageservers. Because we shouldn't reconnect to them when they decide to stop their walreceiver. I e this can appear when there is a load on the compute and we are trying to detach timeline from old pageserver. In this case callmemaybe will try to reconnect to it because replication termination condition is not met (page server with active compute could never catch up to the latest lsn, so there is always some wal tail)

View File

@@ -25,6 +25,7 @@ use zenith_utils::zid::{opt_display_serde, ZTimelineId};
use super::models::BranchCreateRequest;
use super::models::TenantCreateRequest;
use crate::branches::BranchInfo;
use crate::repository::RepositoryTimeline;
use crate::repository::TimelineSyncState;
use crate::{branches, config::PageServerConf, tenant_mgr, ZTenantId};
@@ -247,6 +248,58 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
Ok(json_response(StatusCode::OK, response_data)?)
}
async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
tokio::task::spawn_blocking(move || {
let _enter =
info_span!("timeline_attach_handler", tenant = %tenant_id, timeline = %timeline_id)
.entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
match repo.get_timeline(timeline_id)? {
RepositoryTimeline::Local(_) => {
anyhow::bail!("Timeline with id {} is already local", timeline_id)
}
RepositoryTimeline::Remote {
id: _,
disk_consistent_lsn: _,
} => {
// FIXME (rodionov) get timeline already schedules timeline for download, and duplicate tasks can cause errors
// first should be fixed in https://github.com/zenithdb/zenith/issues/997
// TODO (rodionov) change timeline state to awaits download (incapsulate it somewhere in the repo)
// TODO (rodionov) can we safely request replication on the timeline before sync is completed? (can be implemented on top of the #997)
Ok(())
}
}
})
.await
.map_err(ApiError::from_err)??;
Ok(json_response(StatusCode::ACCEPTED, ())?)
}
async fn timeline_detach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
tokio::task::spawn_blocking(move || {
let _enter =
info_span!("timeline_detach_handler", tenant = %tenant_id, timeline = %timeline_id)
.entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
repo.detach_timeline(timeline_id)
})
.await
.map_err(ApiError::from_err)??;
Ok(json_response(StatusCode::OK, ())?)
}
async fn tenant_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
// check for management permission
check_permission(&request, None)?;
@@ -267,13 +320,13 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let response_data = tokio::task::spawn_blocking(move || {
tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_create", tenant = %request_data.tenant_id).entered();
tenant_mgr::create_repository_for_tenant(get_config(&request), request_data.tenant_id)
})
.await
.map_err(ApiError::from_err)??;
Ok(json_response(StatusCode::CREATED, response_data)?)
Ok(json_response(StatusCode::CREATED, ())?)
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -308,6 +361,14 @@ pub fn make_router(
"/v1/timeline/:tenant_id/:timeline_id",
timeline_detail_handler,
)
.post(
"/v1/timeline/:tenant_id/:timeline_id/attach",
timeline_attach_handler,
)
.post(
"/v1/timeline/:tenant_id/:timeline_id/detach",
timeline_detach_handler,
)
.get("/v1/branch/:tenant_id", branch_list_handler)
.get("/v1/branch/:tenant_id/:branch_name", branch_detail_handler)
.post("/v1/branch", branch_create_handler)

View File

@@ -285,7 +285,46 @@ impl Repository for LayeredRepository {
Ok(())
}
// TODO this method currently does not do anything to prevent (or react to) state updates between a sync task schedule and a sync task end (that causes this update).
// Detaches the timeline from the repository.
fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()> {
let mut timelines = self.timelines.lock().unwrap();
match timelines.entry(timeline_id) {
Entry::Vacant(_) => {
bail!("cannot detach non existing timeline");
}
Entry::Occupied(mut entry) => {
let timeline_entry = entry.get_mut();
let timeline = match timeline_entry {
LayeredTimelineEntry::Remote { .. } => {
bail!("cannot detach remote timeline {}", timeline_id);
}
LayeredTimelineEntry::Local(timeline) => timeline,
};
// TODO (rodionov) keep local state in timeline itself (refactoring related to https://github.com/zenithdb/zenith/issues/997 and #1104)
// FIXME this is local disk consistent lsn, need to keep the latest succesfully uploaded checkpoint lsn in timeline (metadata?)
// https://github.com/zenithdb/zenith/issues/1104
let remote_disk_consistent_lsn = timeline.disk_consistent_lsn.load();
// reference to timeline is dropped here
entry.insert(LayeredTimelineEntry::Remote {
id: timeline_id,
disk_consistent_lsn: remote_disk_consistent_lsn,
});
}
};
// Release the lock to shutdown and remove the files without holding it
drop(timelines);
// shutdown the timeline (this shuts down the walreceiver)
thread_mgr::shutdown_threads(None, Some(self.tenantid), Some(timeline_id));
// remove timeline files (maybe avoid this for ease of debugging if something goes wrong)
fs::remove_dir_all(self.conf.timeline_path(&timeline_id, &self.tenantid))?;
Ok(())
}
// TODO this method currentlly does not do anything to prevent (or react to) state updates between a sync task schedule and a sync task end (that causes this update).
// Sync task is enqueued and can error and be rescheduled, so some significant time may pass between the events.
//
/// Reacts on the timeline sync state change, changing pageserver's memory state for this timeline (unload or load of the timeline files).
@@ -294,6 +333,10 @@ impl Repository for LayeredRepository {
timeline_id: ZTimelineId,
new_state: TimelineSyncState,
) -> Result<()> {
debug!(
"set_timeline_state: timeline_id: {}, new_state: {:?}",
timeline_id, new_state
);
let mut timelines_accessor = self.timelines.lock().unwrap();
match new_state {
@@ -314,6 +357,7 @@ impl Repository for LayeredRepository {
},
),
};
// NOTE we do not delete local data in case timeline became cloud only, this is performed in detach_timeline
drop(timelines_accessor);
Ok(())

View File

@@ -73,7 +73,7 @@ impl RemoteStorage for LocalFs {
}
async fn list(&self) -> anyhow::Result<Vec<Self::StoragePath>> {
Ok(get_all_files(&self.root).await?.into_iter().collect())
get_all_files(&self.root).await
}
async fn upload(

View File

@@ -333,6 +333,10 @@ pub fn schedule_timeline_checkpoint_upload(
///
/// Ensure that the loop is started otherwise the task is never processed.
pub fn schedule_timeline_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) {
debug!(
"Scheduling timeline download for tenant {}, timeline {}",
tenant_id, timeline_id
);
sync_queue::push(SyncTask::new(
TimelineSyncId(tenant_id, timeline_id),
0,

View File

@@ -19,6 +19,8 @@ pub type BlockNumber = u32;
/// A repository corresponds to one .zenith directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
pub trait Repository: Send + Sync {
fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
/// Updates timeline based on the new sync state, received from the remote storage synchronization.
/// See [`crate::remote_storage`] for more details about the synchronization.
fn set_timeline_state(
@@ -242,7 +244,7 @@ pub trait Timeline: Send + Sync {
fn get_current_logical_size(&self) -> usize;
/// Does the same as get_current_logical_size but counted on demand.
/// Used in tests to ensure thet incremental and non incremental variants match.
/// Used in tests to ensure that incremental and non incremental variants match.
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize>;
/// An escape hatch to allow "casting" a generic Timeline to LayeredTimeline.

View File

@@ -73,6 +73,7 @@ pub fn set_timeline_states(
let mut m = access_tenants();
for (tenant_id, timeline_states) in timeline_states {
let tenant = m.entry(tenant_id).or_insert_with(|| {
// TODO (rodionov) reuse one of the initialisation routines
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);

View File

@@ -65,6 +65,7 @@ pub fn launch_wal_receiver(
match receivers.get_mut(&(tenantid, timelineid)) {
Some(receiver) => {
info!("wal receiver already running, updating connection string");
receiver.wal_producer_connstr = wal_producer_connstr.into();
}
None => {

View File

@@ -100,7 +100,7 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: str):
timelines = client.timeline_list(tenant_id)
assert len(timelines) > 0
for timeline_id_str in timelines:
timeline_details = client.timeline_details(tenant_id.hex, timeline_id_str)
timeline_details = client.timeline_detail(tenant_id, UUID(timeline_id_str))
assert timeline_details['type'] == 'Local'
assert timeline_details['tenant_id'] == tenant_id.hex
assert timeline_details['timeline_id'] == timeline_id_str

View File

@@ -4,6 +4,7 @@
import time, shutil, os
from contextlib import closing
from pathlib import Path
from uuid import UUID
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.log_helper import log
import pytest
@@ -77,7 +78,7 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
client = env.pageserver.http_client()
attempts = 0
while True:
timeline_details = client.timeline_details(tenant_id, timeline_id)
timeline_details = client.timeline_detail(UUID(tenant_id), UUID(timeline_id))
assert timeline_details['timeline_id'] == timeline_id
assert timeline_details['tenant_id'] == tenant_id
if timeline_details['type'] == 'Local':

View File

@@ -0,0 +1,259 @@
from contextlib import closing, contextmanager
import os
import pathlib
import subprocess
import threading
from uuid import UUID
from fixtures.log_helper import log
import time
import signal
import pytest
from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, ZenithPageserverHttpClient, zenith_binpath, pg_distrib_dir
@contextmanager
def new_pageserver_helper(new_pageserver_dir: pathlib.Path,
pageserver_bin: pathlib.Path,
remote_storage_mock_path: pathlib.Path,
pg_port: int,
http_port: int):
"""
cannot use ZenithPageserver yet because it depends on zenith cli
which currently lacks support for multiple pageservers
"""
cmd = [
str(pageserver_bin),
'--init',
'--workdir',
str(new_pageserver_dir),
f"-c listen_pg_addr='localhost:{pg_port}'",
f"-c listen_http_addr='localhost:{http_port}'",
f"-c pg_distrib_dir='{pg_distrib_dir}'",
f"-c remote_storage={{local_path='{remote_storage_mock_path}'}}",
]
subprocess.check_output(cmd, text=True)
# actually run new pageserver
cmd = [
str(pageserver_bin),
'--workdir',
str(new_pageserver_dir),
'--daemonize',
]
log.info("starting new pageserver %s", cmd)
out = subprocess.check_output(cmd, text=True)
log.info("started new pageserver %s", out)
try:
yield
finally:
log.info("stopping new pageserver")
pid = int((new_pageserver_dir / 'pageserver.pid').read_text())
os.kill(pid, signal.SIGQUIT)
def wait_for(number_of_iterations: int, interval: int, func):
last_exception = None
for i in range(number_of_iterations):
try:
res = func()
except Exception as e:
log.info("waiting for %s iteration %s failed", func, i + 1)
last_exception = e
time.sleep(interval)
continue
return res
raise Exception("timed out while waiting for %s" % func) from last_exception
@contextmanager
def pg_cur(pg):
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
yield cur
def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Event):
log.info("load started")
inserted_ctr = 0
failed = False
while not stop_event.is_set():
try:
with pg_cur(pg) as cur:
cur.execute("INSERT INTO load VALUES ('some payload')")
inserted_ctr += 1
except:
if not failed:
log.info("load failed")
failed = True
load_ok_event.clear()
else:
if failed:
with pg_cur(pg) as cur:
# if we recovered after failure verify that we have correct number of rows
log.info("recovering at %s", inserted_ctr)
cur.execute("SELECT count(*) FROM load")
assert cur.fetchone()[0] == inserted_ctr
log.info("successfully recovered %s", inserted_ctr)
failed = False
load_ok_event.set()
log.info('load thread stopped')
def assert_local(pageserver_http_client: ZenithPageserverHttpClient, tenant: str, timeline: str):
timeline_detail = pageserver_http_client.timeline_detail(UUID(tenant), UUID(timeline))
assert timeline_detail.get('type') == "Local", timeline_detail
return timeline_detail
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
port_distributor: PortDistributor,
with_load: str):
zenith_env_builder.num_safekeepers = 1
zenith_env_builder.enable_local_fs_remote_storage()
env = zenith_env_builder.init()
# create folder for remote storage mock
remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage'
tenant = env.create_tenant("74ee8b079a0e437eb0afea7d26a07209")
log.info("tenant to relocate %s", tenant)
env.zenith_cli(["branch", "test_tenant_relocation", "main", f"--tenantid={tenant}"])
tenant_pg = env.postgres.create_start(
"test_tenant_relocation",
"main", # branch name, None means same as node name
tenant_id=tenant,
)
# insert some data
with closing(tenant_pg.connect()) as conn:
with conn.cursor() as cur:
# save timeline for later gc call
cur.execute("SHOW zenith.zenith_timeline")
timeline = cur.fetchone()[0]
log.info("timeline to relocate %s", timeline)
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'some payload'")
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (500500, )
if with_load == 'with_load':
# create load table
with pg_cur(tenant_pg) as cur:
cur.execute("CREATE TABLE load(value text)")
load_stop_event = threading.Event()
load_ok_event = threading.Event()
load_thread = threading.Thread(target=load,
args=(tenant_pg, load_stop_event, load_ok_event))
load_thread.start()
# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"do_gc {tenant} {timeline}")
# ensure upload is completed
pageserver_http_client = env.pageserver.http_client()
timeline_detail = pageserver_http_client.timeline_detail(UUID(tenant), UUID(timeline))
assert timeline_detail['disk_consistent_lsn'] == timeline_detail['timeline_state']['Ready']
log.info("inititalizing new pageserver")
# bootstrap second pageserver
new_pageserver_dir = env.repo_dir / 'new_pageserver'
new_pageserver_dir.mkdir()
new_pageserver_pg_port = port_distributor.get_port()
new_pageserver_http_port = port_distributor.get_port()
log.info("new pageserver ports pg %s http %s", new_pageserver_pg_port, new_pageserver_http_port)
pageserver_bin = pathlib.Path(zenith_binpath) / 'pageserver'
new_pageserver_http_client = ZenithPageserverHttpClient(port=new_pageserver_http_port,
auth_token=None)
with new_pageserver_helper(new_pageserver_dir,
pageserver_bin,
remote_storage_mock_path,
new_pageserver_pg_port,
new_pageserver_http_port):
# call to attach timeline to new timeline
new_pageserver_http_client.timeline_attach(UUID(tenant), UUID(timeline))
# FIXME cannot handle duplicate download requests, subject to fix in https://github.com/zenithdb/zenith/issues/997
time.sleep(5)
# new pageserver should in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_for(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http_client, tenant, timeline))
assert new_timeline_detail['timeline_state'].get('Ready'), new_timeline_detail
# when load is active these checks can break because lsns are not static
if with_load == 'without_load':
assert new_timeline_detail['disk_consistent_lsn'] == timeline_detail[
'disk_consistent_lsn']
assert new_timeline_detail['timeline_state']['Ready'] == timeline_detail[
'timeline_state']['Ready']
# callmemaybe to start replication from safekeeper to the new pageserver
# when there is no load there is a clean checkpoint and no wal delta
# needs to be streamed to the new pageserver
# TODO (rodionov) use attach to start replication
with closing(PgProtocol(host='localhost',
port=new_pageserver_pg_port).connect()) as new_pageserver_pg:
with new_pageserver_pg.cursor() as cur:
# "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'"
safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
cur.execute("callmemaybe {} {} {}".format(tenant, timeline, safekeeper_connstring))
tenant_pg.stop()
# rewrite zenith cli config to use new pageserver for basebackup to start new compute
cli_config_lines = (env.repo_dir / 'config').read_text().splitlines()
cli_config_lines[-2] = f"listen_http_addr = 'localhost:{new_pageserver_http_port}'"
cli_config_lines[-1] = f"listen_pg_addr = 'localhost:{new_pageserver_pg_port}'"
(env.repo_dir / 'config').write_text('\n'.join(cli_config_lines))
tenant_pg_config_file_path = pathlib.Path(tenant_pg.config_file_path())
tenant_pg_config_file_path.open('a').write(
f"\nzenith.page_server_connstring = 'postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
)
tenant_pg.start()
# detach tenant from old pageserver before we check
# that all the data is there to be sure that old pageserver
# is no longer involved, and if it is, we will see the errors
pageserver_http_client.timeline_detach(UUID(tenant), UUID(timeline))
with closing(tenant_pg.connect()) as conn:
with conn.cursor() as cur:
# check that data is still there
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (500500, )
# check that we can write new data
cur.execute("INSERT INTO t SELECT generate_series(1001,2000), 'some payload'")
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (2001000, )
if with_load == 'with_load':
assert load_ok_event.is_set()
log.info('stopping load thread')
load_stop_event.set()
load_thread.join()
log.info('load thread stopped')
# bring old pageserver back for clean shutdown via zenith cli
# new pageserver will be shut down by the context manager
cli_config_lines = (env.repo_dir / 'config').read_text().splitlines()
cli_config_lines[-2] = f"listen_http_addr = 'localhost:{env.pageserver.service_port.http}'"
cli_config_lines[-1] = f"listen_pg_addr = 'localhost:{env.pageserver.service_port.pg}'"
(env.repo_dir / 'config').write_text('\n'.join(cli_config_lines))

View File

@@ -725,6 +725,16 @@ class ZenithPageserverHttpClient(requests.Session):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def timeline_attach(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID):
res = self.post(
f"http://localhost:{self.port}/v1/timeline/{tenant_id.hex}/{timeline_id.hex}/attach", )
res.raise_for_status()
def timeline_detach(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID):
res = self.post(
f"http://localhost:{self.port}/v1/timeline/{tenant_id.hex}/{timeline_id.hex}/detach", )
res.raise_for_status()
def branch_list(self, tenant_id: uuid.UUID) -> List[Dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/branch/{tenant_id.hex}")
res.raise_for_status()
@@ -777,8 +787,9 @@ class ZenithPageserverHttpClient(requests.Session):
assert isinstance(res_json, list)
return res_json
def timeline_details(self, tenant_id: str, timeline_id: str) -> Dict[Any, Any]:
res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id}/{timeline_id}")
def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID):
res = self.get(
f"http://localhost:{self.port}/v1/timeline/{tenant_id.hex}/{timeline_id.hex}")
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)

View File

@@ -7,6 +7,7 @@
//!
use crate::SafeKeeperConf;
use anyhow::{Context, Result};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};
@@ -41,9 +42,12 @@ async fn request_callback(
let me_conf: postgres::config::Config = me_connstr.parse().unwrap();
let (host, port) = connection_host_port(&me_conf);
// pageserver connstr is needed to be able to distinguish between different pageservers
// it is required to correctly manage callmemaybe subscriptions when more than one pageserver is involved
// TODO it is better to use some sort of a unique id instead of connection string, see https://github.com/zenithdb/zenith/issues/1105
let callme = format!(
"callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'",
tenantid, timelineid, host, port, timelineid, tenantid
"callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={} pageserver_connstr={}'",
tenantid, timelineid, host, port, timelineid, tenantid, pageserver_connstr,
);
let _ = client.simple_query(&callme).await?;
@@ -66,12 +70,13 @@ pub enum CallmeEvent {
// add new subscription to the list
Subscribe(ZTenantId, ZTimelineId, String),
// remove the subscription from the list
Unsubscribe(ZTenantId, ZTimelineId),
Unsubscribe(ZTenantId, ZTimelineId, String),
// don't serve this subscription, but keep it in the list
Pause(ZTenantId, ZTimelineId),
Pause(ZTenantId, ZTimelineId, String),
// resume this subscription, if it exists,
// but don't create a new one if it is gone
Resume(ZTenantId, ZTimelineId),
Resume(ZTenantId, ZTimelineId, String),
// TODO how do we delete from subscriptions?
}
#[derive(Debug)]
@@ -117,6 +122,7 @@ impl SubscriptionState {
let timelineid = self.timelineid;
let tenantid = self.tenantid;
let pageserver_connstr = self.pageserver_connstr.clone();
tokio::spawn(async move {
if let Err(err) = handle.await {
if err.is_cancelled() {
@@ -124,8 +130,8 @@ impl SubscriptionState {
timelineid, tenantid);
} else {
error!(
"callback task for timelineid={} tenantid={} failed: {}",
timelineid, tenantid, err
"callback task for timelineid={} tenantid={} pageserver_connstr={} failed: {}",
timelineid, tenantid, pageserver_connstr, err
);
}
}
@@ -137,7 +143,7 @@ impl SubscriptionState {
// Ignore call request if this subscription is paused
if self.paused {
debug!(
"ignore call request for paused subscription
"ignore call request for paused subscription \
tenantid: {}, timelineid: {}",
self.tenantid, self.timelineid
);
@@ -147,7 +153,7 @@ impl SubscriptionState {
// Check if it too early to recall
if self.handle.is_some() && self.last_call_time.elapsed() < recall_period {
debug!(
"too early to recall. self.last_call_time.elapsed: {:?}, recall_period: {:?}
"too early to recall. self.last_call_time.elapsed: {:?}, recall_period: {:?} \
tenantid: {}, timelineid: {}",
self.last_call_time, recall_period, self.tenantid, self.timelineid
);
@@ -175,8 +181,7 @@ impl SubscriptionState {
// Update last_call_time
self.last_call_time = Instant::now();
info!(
"new call spawned. time {:?}
tenantid: {}, timelineid: {}",
"new call spawned. last call time {:?} tenantid: {}, timelineid: {}",
self.last_call_time, self.tenantid, self.timelineid
);
}
@@ -189,7 +194,7 @@ impl Drop for SubscriptionState {
}
pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEvent>) -> Result<()> {
let subscriptions: Mutex<HashMap<(ZTenantId, ZTimelineId), SubscriptionState>> =
let subscriptions: Mutex<HashMap<(ZTenantId, ZTimelineId, String), SubscriptionState>> =
Mutex::new(HashMap::new());
let mut ticker = tokio::time::interval(conf.recall_period);
@@ -201,50 +206,73 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEve
{
CallmeEvent::Subscribe(tenantid, timelineid, pageserver_connstr) =>
{
let _enter = info_span!("callmemaybe: subscribe", timelineid = %timelineid, tenantid = %tenantid, pageserver_connstr=%pageserver_connstr.clone()).entered();
let mut subscriptions = subscriptions.lock().unwrap();
if let Some(sub) = subscriptions.get(&(tenantid, timelineid))
{
info!("callmemaybe. subscription already exists {:?}", sub);
match subscriptions.entry((tenantid, timelineid, pageserver_connstr.clone())) {
Entry::Occupied(_) => {
// Do nothing if subscription already exists
// If it is paused it means that there is already established replication connection.
// If it is not paused it will be polled with other subscriptions when timeout expires.
// This can occur when replication channel is established before subscription is added.
info!(
"subscription already exists",
);
}
Entry::Vacant(entry) => {
let subscription = entry.insert(SubscriptionState::new(
tenantid,
timelineid,
pageserver_connstr,
));
subscription.call(conf.recall_period, conf.listen_pg_addr.clone());
}
}
if let Some(mut sub) = subscriptions.insert((tenantid, timelineid),
SubscriptionState::new(tenantid, timelineid, pageserver_connstr))
{
sub.call(conf.recall_period, conf.listen_pg_addr.clone());
},
CallmeEvent::Unsubscribe(tenantid, timelineid, pageserver_connstr) => {
let _enter = debug_span!("callmemaybe: unsubscribe", timelineid = %timelineid, tenantid = %tenantid, pageserver_connstr=%pageserver_connstr.clone()).entered();
debug!("unsubscribe");
let mut subscriptions = subscriptions.lock().unwrap();
subscriptions.remove(&(tenantid, timelineid, pageserver_connstr));
},
CallmeEvent::Pause(tenantid, timelineid, pageserver_connstr) => {
let _enter = debug_span!("callmemaybe: pause", timelineid = %timelineid, tenantid = %tenantid, pageserver_connstr=%pageserver_connstr.clone()).entered();
let mut subscriptions = subscriptions.lock().unwrap();
// If pause received when no corresponding subscription exists it means that someone started replication
// without using callmemaybe. So we create subscription and pause it.
// In tenant relocation scenario subscribe call will be executed after pause when compute is restarted.
// In that case there is no need to create new/unpause existing subscription.
match subscriptions.entry((tenantid, timelineid, pageserver_connstr.clone())) {
Entry::Occupied(mut sub) => {
debug!("pause existing");
sub.get_mut().pause();
}
Entry::Vacant(entry) => {
debug!("create paused");
let subscription = entry.insert(SubscriptionState::new(
tenantid,
timelineid,
pageserver_connstr,
));
subscription.pause();
}
}
info!("callmemaybe. thread_main. subscribe callback request for timelineid={} tenantid={}",
timelineid, tenantid);
},
CallmeEvent::Unsubscribe(tenantid, timelineid) => {
CallmeEvent::Resume(tenantid, timelineid, pageserver_connstr) => {
debug!("callmemaybe. thread_main. resume callback request for timelineid={} tenantid={} pageserver_connstr={}",
timelineid, tenantid, pageserver_connstr);
let mut subscriptions = subscriptions.lock().unwrap();
subscriptions.remove(&(tenantid, timelineid));
info!("callmemaybe. thread_main. unsubscribe callback. request for timelineid={} tenantid={}",
timelineid, tenantid);
},
CallmeEvent::Pause(tenantid, timelineid) => {
let mut subscriptions = subscriptions.lock().unwrap();
if let Some(sub) = subscriptions.get_mut(&(tenantid, timelineid))
{
sub.pause();
};
info!("callmemaybe. thread_main. pause callback request for timelineid={} tenantid={}",
timelineid, tenantid);
},
CallmeEvent::Resume(tenantid, timelineid) => {
let mut subscriptions = subscriptions.lock().unwrap();
if let Some(sub) = subscriptions.get_mut(&(tenantid, timelineid))
if let Some(sub) = subscriptions.get_mut(&(tenantid, timelineid, pageserver_connstr))
{
sub.resume();
};
info!("callmemaybe. thread_main. resume callback request for timelineid={} tenantid={}",
timelineid, tenantid);
},
}
},
_ = ticker.tick() => {
let mut subscriptions = subscriptions.lock().unwrap();
for (&(_tenantid, _timelineid), state) in subscriptions.iter_mut() {
for (&(_tenantid, _timelineid, _), state) in subscriptions.iter_mut() {
state.call(conf.recall_period, conf.listen_pg_addr.clone());
}
},

View File

@@ -30,6 +30,7 @@ pub struct SafekeeperPostgresHandler {
pub ztenantid: Option<ZTenantId>,
pub ztimelineid: Option<ZTimelineId>,
pub timeline: Option<Arc<Timeline>>,
pageserver_connstr: Option<String>,
//sender to communicate with callmemaybe thread
pub tx: UnboundedSender<CallmeEvent>,
}
@@ -88,6 +89,8 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
self.appname = Some(app_name.clone());
}
self.pageserver_connstr = params.get("pageserver_connstr").cloned();
Ok(())
} else {
bail!("Walkeeper received unexpected initial message: {:?}", sm);
@@ -127,7 +130,7 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
}
SafekeeperPostgresCommand::StartReplication { start_lsn } => {
ReplicationConn::new(pgb)
.run(self, pgb, start_lsn)
.run(self, pgb, start_lsn, self.pageserver_connstr.clone())
.context("failed to run ReplicationConn")?;
}
SafekeeperPostgresCommand::IdentifySystem => {
@@ -149,6 +152,7 @@ impl SafekeeperPostgresHandler {
ztenantid: None,
ztimelineid: None,
timeline: None,
pageserver_connstr: None,
tx,
}
}

View File

@@ -18,10 +18,9 @@ use crate::handler::SafekeeperPostgresHandler;
use crate::timeline::TimelineTools;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeMessage};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::zid::ZTenantId;
use crate::callmemaybe::CallmeEvent;
use tokio::sync::mpsc::UnboundedSender;
pub struct ReceiveWalConn<'pg> {
/// Postgres connection
@@ -106,13 +105,11 @@ impl<'pg> ReceiveWalConn<'pg> {
// Add far as replication in postgres is initiated by receiver
// we should use callmemaybe mechanism.
let timelineid = spg.timeline.get().timelineid;
let tx_clone = spg.tx.clone();
let pageserver_connstr = pageserver_connstr.to_owned();
spg.tx
.send(CallmeEvent::Subscribe(
tenant_id,
timelineid,
pageserver_connstr,
pageserver_connstr.to_owned(),
))
.unwrap_or_else(|e| {
error!(
@@ -123,9 +120,6 @@ impl<'pg> ReceiveWalConn<'pg> {
// create a guard to unsubscribe callback, when this wal_stream will exit
Some(SendWalHandlerGuard {
_tx: tx_clone,
_tenant_id: tenant_id,
_timelineid: timelineid,
timeline: Arc::clone(spg.timeline.get()),
})
}
@@ -147,22 +141,11 @@ impl<'pg> ReceiveWalConn<'pg> {
}
struct SendWalHandlerGuard {
_tx: UnboundedSender<CallmeEvent>,
_tenant_id: ZTenantId,
_timelineid: ZTimelineId,
timeline: Arc<Timeline>,
}
impl Drop for SendWalHandlerGuard {
fn drop(&mut self) {
self.timeline.stop_streaming();
// self.tx
// .send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid))
// .unwrap_or_else(|e| {
// error!(
// "failed to send Unsubscribe request to callmemaybe thread {}",
// e
// );
// });
}
}

View File

@@ -91,17 +91,30 @@ struct ReplicationStreamGuard {
tx: UnboundedSender<CallmeEvent>,
tenant_id: ZTenantId,
timelineid: ZTimelineId,
pageserver_connstr: String,
}
impl Drop for ReplicationStreamGuard {
fn drop(&mut self) {
// the connection with pageserver is lost,
// resume callback subscription
debug!("Connection to pageserver is gone. Resume callmemeybe subsciption if necessary. tenantid {} timelineid {}",
self.tenant_id, self.timelineid);
debug!(
"Connection to pageserver is gone. Resume callmemaybe subsciption if necessary. tenantid {} timelineid {}",
self.tenant_id, self.timeline_id,
);
let subscription_key = SubscriptionStateKey::new(
self.tenant_id,
self.timeline_id,
self.pageserver_connstr.to_owned(),
);
self.tx
.send(CallmeEvent::Resume(self.tenant_id, self.timelineid))
.send(CallmeEvent::Resume(
self.tenant_id,
self.timelineid,
self.pageserver_connstr.to_owned(),
))
.unwrap_or_else(|e| {
error!("failed to send Resume request to callmemaybe thread {}", e);
});
@@ -194,8 +207,9 @@ impl ReplicationConn {
spg: &mut SafekeeperPostgresHandler,
pgb: &mut PostgresBackend,
mut start_pos: Lsn,
pageserver_connstr: Option<String>,
) -> Result<()> {
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap()).entered();
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap(), pageserver_connstr = %pageserver_connstr.as_deref().unwrap_or_default()).entered();
// spawn the background thread which receives HotStandbyFeedback messages.
let bg_timeline = Arc::clone(spg.timeline.get());
@@ -256,11 +270,16 @@ impl ReplicationConn {
if spg.appname == Some("wal_proposer_recovery".to_string()) {
None
} else {
let pageserver_connstr = pageserver_connstr.clone().expect("there should be a pageserver connection string since this is not a wal_proposer_recovery");
let timelineid = spg.timeline.get().timelineid;
let tenant_id = spg.ztenantid.unwrap();
let tx_clone = spg.tx.clone();
spg.tx
.send(CallmeEvent::Pause(tenant_id, timelineid))
.send(CallmeEvent::Pause(
tenant_id,
timelineid,
pageserver_connstr.clone(),
))
.unwrap_or_else(|e| {
error!("failed to send Pause request to callmemaybe thread {}", e);
});
@@ -270,6 +289,7 @@ impl ReplicationConn {
tx: tx_clone,
tenant_id,
timelineid,
pageserver_connstr,
})
}
};
@@ -293,12 +313,20 @@ impl ReplicationConn {
if let Some(lsn) = lsn {
end_pos = lsn;
} else {
// Is is time to end streaming to this replica?
// Is it time to end streaming to this replica?
if spg.timeline.get().check_stop_streaming(replica_id) {
let timelineid = spg.timeline.get().timelineid;
// this expect should never fail because in wal_proposer_recovery mode stop_pos is set
// and this code is not reachable
let pageserver_connstr = pageserver_connstr
.expect("there should be a pageserver connection string since this is not a wal_proposer_recovery");
let timelineid = spg.timeline.get().timeline_id;
let tenant_id = spg.ztenantid.unwrap();
spg.tx
.send(CallmeEvent::Unsubscribe(tenant_id, timelineid))
.send(CallmeEvent::Unsubscribe(
tenant_id,
timelineid,
pageserver_connstr,
))
.unwrap_or_else(|e| {
error!("failed to send Pause request to callmemaybe thread {}", e);
});

View File

@@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
// Zenith ID is a 128-bit random ID.
// Used to represent various identifiers. Provides handy utility methods and impls.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
struct ZId([u8; 16]);
impl ZId {
@@ -64,6 +64,12 @@ impl fmt::Display for ZId {
}
}
impl fmt::Debug for ZId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&hex::encode(self.0))
}
}
macro_rules! zid_newtype {
($t:ident) => {
impl $t {
@@ -118,6 +124,12 @@ macro_rules! zid_newtype {
self.0.fmt(f)
}
}
impl fmt::Debug for $t {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
};
}
@@ -143,14 +155,14 @@ macro_rules! zid_newtype {
/// is separate from PostgreSQL timelines, and doesn't have those
/// limitations. A zenith timeline is identified by a 128-bit ID, which
/// is usually printed out as a hex string.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
pub struct ZTimelineId(ZId);
zid_newtype!(ZTimelineId);
// Zenith Tenant Id represents identifiar of a particular tenant.
// Is used for distinguishing requests and data belonging to different users.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
pub struct ZTenantId(ZId);
zid_newtype!(ZTenantId);