Use cursor to copy data

This commit is contained in:
Konstantin Knizhnik
2023-07-05 21:49:55 +03:00
parent 06357afe6d
commit 1389927d36

View File

@@ -107,6 +107,7 @@ backpressure_throttling_time(PG_FUNCTION_ARGS)
#define N_RAW_PAGE_COLUMNS 4
#define COPY_FETCH_COUNT 16
static void
@@ -259,7 +260,9 @@ copy_from(PG_FUNCTION_ARGS)
{
char const* conninfo = PG_GETARG_CSTRING(0);
PGconn* conn;
char const* sql = "select * from copydata() as raw_page(relid text, fork text, blkno integer, content bytea)";
char const* declare_cursor = "declare copy_data_cursor no scroll cursor for select * from copydata() as raw_page(relid text, fork text, blkno integer, content bytea)";
char* fetch_cursor = psprintf("fetch forward %d copy_data_cursor", COPY_FETCH_COUNT);
char const* close_cursor = "close copy_data_cursor";
char *content;
char const* relname;
BlockNumber blkno;
@@ -291,11 +294,23 @@ copy_from(PG_FUNCTION_ARGS)
report_error(ERROR, res, conn, true, CREATE_COPYDATA_FUNC);
PQclear(res);
/* Start transaction to use cursor */
res = PQexec(conn, "BEGIN");
if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
report_error(ERROR, res, conn, true, "BEGIN");
PQclear(res);
/* Declare cursor (we have to use cursor to avoid materializing all database in memory) */
res = PQexec(conn, declare_cursor);
if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
report_error(ERROR, res, conn, true, declare_cursor);
PQclear(res);
/* Get database data */
for (res = PQexecParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1); res != NULL; res = get_result(conn, sql))
while ((res = PQexecParams(conn, fetch_cursor, 0, NULL, NULL, NULL, NULL, 1)) != NULL)
{
if (PQresultStatus(res) != PGRES_TUPLES_OK)
report_error(ERROR, res, conn, true, sql);
report_error(ERROR, res, conn, true, fetch_cursor);
n_tuples = PQntuples(res);
if (PQnfields(res) != 4)
@@ -340,10 +355,23 @@ copy_from(PG_FUNCTION_ARGS)
prev_blkno = blkno;
}
PQclear(res);
res = get_result(conn, sql);
if (n_tuples < COPY_FETCH_COUNT)
break;
}
res = PQexec(conn, close_cursor);
if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
report_error(ERROR, res, conn, true, close_cursor);
PQclear(res);
if (rel)
relation_close(rel, AccessExclusiveLock);
/* Complete transaction */
res = PQexec(conn, "END");
if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
report_error(ERROR, res, conn, true, "END");
PQclear(res);
PQfinish(conn);
PG_RETURN_INT64(total);
}