From 6943dac164ce54f73c61eab04123ead0c1da9de6 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 5 Jul 2023 15:47:13 +0300 Subject: [PATCH] Support cloning database without index rebuild --- pgxn/neon/neon--1.0.sql | 4 + pgxn/neon/neon.c | 241 ++++++++++++++++++++++++++++++++++++++++ vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- 4 files changed, 247 insertions(+), 2 deletions(-) diff --git a/pgxn/neon/neon--1.0.sql b/pgxn/neon/neon--1.0.sql index 6cf111ea6a..a477683361 100644 --- a/pgxn/neon/neon--1.0.sql +++ b/pgxn/neon/neon--1.0.sql @@ -32,3 +32,7 @@ CREATE VIEW local_cache AS SELECT P.* FROM local_cache_pages() AS P (pageoffs int8, relfilenode oid, reltablespace oid, reldatabase oid, relforknumber int2, relblocknumber int8, accesscount int4); + +CREATE FUNCTION copy_from(conninfo cstring) RETURNS BIGINT +AS 'MODULE_PATHNAME', 'copy_from' +LANGUAGE C; diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index b45d7cfc32..51fd1f2a5c 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -13,20 +13,32 @@ #include "access/xact.h" #include "access/xlog.h" +#include "access/relation.h" +#include "access/xloginsert.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "catalog/pg_type.h" +#include "catalog/namespace.h" #include "replication/walsender.h" #include "funcapi.h" +#include "miscadmin.h" #include "access/htup_details.h" #include "utils/pg_lsn.h" #include "utils/guc.h" +#include "utils/wait_event.h" +#include "utils/rel.h" +#include "utils/varlena.h" +#include "utils/builtins.h" #include "neon.h" #include "walproposer.h" #include "pagestore_client.h" #include "control_plane_connector.h" +#include "libpq-fe.h" +#include "libpq/pqformat.h" +#include "libpq/libpq.h" + PG_MODULE_MAGIC; void _PG_init(void); @@ -46,6 +58,7 @@ _PG_init(void) PG_FUNCTION_INFO_V1(pg_cluster_size); PG_FUNCTION_INFO_V1(backpressure_lsns); PG_FUNCTION_INFO_V1(backpressure_throttling_time); +PG_FUNCTION_INFO_V1(copy_from); Datum pg_cluster_size(PG_FUNCTION_ARGS) @@ -91,3 +104,231 @@ backpressure_throttling_time(PG_FUNCTION_ARGS) { PG_RETURN_UINT64(BackpressureThrottlingTime()); } + + +#define N_RAW_PAGE_COLUMNS 4 + + +static void +report_error(int elevel, PGresult *res, PGconn *conn, + bool clear, const char *sql) +{ + /* If requested, PGresult must be released before leaving this function. */ + PG_TRY(); + { + char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY); + char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL); + char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT); + char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT); + int sqlstate; + + if (diag_sqlstate) + sqlstate = MAKE_SQLSTATE(diag_sqlstate[0], + diag_sqlstate[1], + diag_sqlstate[2], + diag_sqlstate[3], + diag_sqlstate[4]); + else + sqlstate = ERRCODE_CONNECTION_FAILURE; + + /* + * If we don't get a message from the PGresult, try the PGconn. This + * is needed because for connection-level failures, PQexec may just + * return NULL, not a PGresult at all. + */ + if (message_primary == NULL) + message_primary = pchomp(PQerrorMessage(conn)); + + ereport(elevel, + (errcode(sqlstate), + (message_primary != NULL && message_primary[0] != '\0') ? + errmsg_internal("%s", message_primary) : + errmsg("could not obtain message string for remote error"), + message_detail ? errdetail_internal("%s", message_detail) : 0, + message_hint ? errhint("%s", message_hint) : 0, + message_context ? errcontext("%s", message_context) : 0, + sql ? errcontext("remote SQL command: %s", sql) : 0)); + } + PG_FINALLY(); + { + if (clear) + PQclear(res); + } + PG_END_TRY(); +} + +static PGresult * +get_result(PGconn *conn, const char *query) +{ + PGresult *volatile last_res = NULL; + + /* In what follows, do not leak any PGresults on an error. */ + PG_TRY(); + { + for (;;) + { + PGresult *res; + + while (PQisBusy(conn)) + { + int wc; + + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE | + WL_EXIT_ON_PM_DEATH, + PQsocket(conn), + -1L, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket? */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(conn)) + report_error(ERROR, NULL, conn, false, query); + } + } + + res = PQgetResult(conn); + if (res == NULL) + break; /* query is complete */ + + PQclear(last_res); + last_res = res; + } + } + PG_CATCH(); + { + PQclear(last_res); + PG_RE_THROW(); + } + PG_END_TRY(); + + return last_res; +} + +#define CREATE_COPYDATA_FUNC "\ +create or replace function copydata() returns setof record as $$ \ +declare \ + relsize integer; \ + content bytea; \ + r record; \ + fork text; \ + relname text; \ + pagesize integer; \ +begin \ + pagesize = (SELECT current_setting('block_size')); \ + for r in select oid,reltoastrelid from pg_class where relnamespace not in (select oid from pg_namespace where nspname in ('pg_catalog','pg_toast','information_schema')) \ + loop \ + relname = r.oid::regclass::text; \ + foreach fork in array array['main','vm','fsm'] \ + loop \ + relsize = (select pg_relation_size(r.oid, fork)); \ + for p in 1..relsize/pagesize \ + loop \ + content = get_raw_page(relname, fork, p-1); \ + return next row(relname,fork,p-1,content); \ + end loop; \ + end loop; \ + if r.reltoastrelid <> 0 then \ + foreach relname in array array ['pg_toast.pg_toast_'||r.oid, 'pg_toast.pg_toast_'||r.oid||'_index'] \ + loop \ + foreach fork in array array['main','vm','fsm'] \ + loop \ + relsize = (select pg_relation_size(relname, fork)); \ + for p in 1..relsize/pagesize \ + loop \ + content = get_raw_page(relname, fork, p-1); \ + return next row(relname,fork,p-1,content); \ + end loop; \ + end loop; \ + end loop; \ + end if; \ + end loop; \ +end; \ +$$ language plpgsql" + +Datum +copy_from(PG_FUNCTION_ARGS) +{ + char const* conninfo = PG_GETARG_CSTRING(0); + PGconn* conn; + char const* sql = "select * from copydata() as raw_page(relid text, fork text, blkno integer, content bytea)"; + char *content; + char const* relname; + BlockNumber blkno; + ForkNumber forknum; + BlockNumber prev_blkno = InvalidBlockNumber; + RangeVar *relrv; + Relation rel = NULL; + BlockNumber rel_size; + int64_t total = 0; + PGresult *res; + char blkno_buf[4]; + int n_tuples; + Buffer buf; + + /* Connect to the source database */ + conn = PQconnectdb(conninfo); + if (!conn || PQstatus(conn) != CONNECTION_OK) + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not connect to server \"%s\"", + conninfo), + errdetail_internal("%s", pchomp(PQerrorMessage(conn))))); + + /* First create store procedure (assumes that pageinspector extension is already installed) */ + res = PQexec(conn, CREATE_COPYDATA_FUNC); + if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK) + report_error(ERROR, res, conn, true, CREATE_COPYDATA_FUNC); + PQclear(res); + + /* Get database data */ + for (res = PQexecParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1); res != NULL; res = get_result(conn, sql)) + { + if (PQresultStatus(res) != PGRES_TUPLES_OK) + report_error(ERROR, res, conn, true, sql); + + n_tuples = PQntuples(res); + if (PQnfields(res) != 4) + elog(ERROR, "unexpected result from copydata()"); + + for (int i = 0; i < n_tuples; i++) + { + relname = PQgetvalue(res, i, 0); + forknum = forkname_to_number(PQgetvalue(res, i, 1)); + memcpy(&blkno, PQgetvalue(res, i, 2), sizeof(BlockNumber)); + blkno = pg_ntoh32(blkno); + content = (char*)PQgetvalue(res, i, 3); + + if (blkno <= prev_blkno) + { + if (forknum == MAIN_FORKNUM) + { + if (rel) + relation_close(rel, AccessExclusiveLock); + relrv = makeRangeVarFromNameList(textToQualifiedNameList(cstring_to_text(relname))); + rel = relation_openrv(relrv, AccessExclusiveLock); + } + rel_size = RelationGetNumberOfBlocksInFork(rel, forknum); + } + buf = ReadBufferExtended(rel, forknum, blkno < rel_size ? blkno : P_NEW, RBM_ZERO_AND_LOCK, NULL); + MarkBufferDirty(buf); + memcpy(BufferGetPage(buf), content, BLCKSZ); + log_newpage_buffer(buf, forknum == MAIN_FORKNUM); + UnlockReleaseBuffer(buf); + + total += 1; + prev_blkno = blkno; + } + PQclear(res); + res = get_result(conn, sql); + } + if (rel) + relation_close(rel, AccessExclusiveLock); + PQfinish(conn); + PG_RETURN_INT64(total); +} diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 1144aee166..a2daebc6b4 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 1144aee1661c79eec65e784a8dad8bd450d9df79 +Subproject commit a2daebc6b445dcbcca9c18e1711f47c1db7ffb04 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 1984832c74..2df2ce3744 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 1984832c740a7fa0e468bb720f40c525b652835d +Subproject commit 2df2ce374464a7449e15dfa46c956b73b4f4098b