diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index f3399b8cba..24eb76b2d7 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -107,6 +107,7 @@ backpressure_throttling_time(PG_FUNCTION_ARGS) #define N_RAW_PAGE_COLUMNS 4 +#define COPY_FETCH_COUNT 16 static void @@ -259,7 +260,9 @@ copy_from(PG_FUNCTION_ARGS) { char const* conninfo = PG_GETARG_CSTRING(0); PGconn* conn; - char const* sql = "select * from copydata() as raw_page(relid text, fork text, blkno integer, content bytea)"; + char const* declare_cursor = "declare copy_data_cursor no scroll cursor for select * from copydata() as raw_page(relid text, fork text, blkno integer, content bytea)"; + char* fetch_cursor = psprintf("fetch forward %d copy_data_cursor", COPY_FETCH_COUNT); + char const* close_cursor = "close copy_data_cursor"; char *content; char const* relname; BlockNumber blkno; @@ -291,11 +294,23 @@ copy_from(PG_FUNCTION_ARGS) report_error(ERROR, res, conn, true, CREATE_COPYDATA_FUNC); PQclear(res); + /* Start transaction to use cursor */ + res = PQexec(conn, "BEGIN"); + if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK) + report_error(ERROR, res, conn, true, "BEGIN"); + PQclear(res); + + /* Declare cursor (we have to use cursor to avoid materializing all database in memory) */ + res = PQexec(conn, declare_cursor); + if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK) + report_error(ERROR, res, conn, true, declare_cursor); + PQclear(res); + /* Get database data */ - for (res = PQexecParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1); res != NULL; res = get_result(conn, sql)) + while ((res = PQexecParams(conn, fetch_cursor, 0, NULL, NULL, NULL, NULL, 1)) != NULL) { if (PQresultStatus(res) != PGRES_TUPLES_OK) - report_error(ERROR, res, conn, true, sql); + report_error(ERROR, res, conn, true, fetch_cursor); n_tuples = PQntuples(res); if (PQnfields(res) != 4) @@ -340,10 +355,23 @@ copy_from(PG_FUNCTION_ARGS) prev_blkno = blkno; } PQclear(res); - res = get_result(conn, sql); + if (n_tuples < COPY_FETCH_COUNT) + break; } + res = PQexec(conn, close_cursor); + if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK) + report_error(ERROR, res, conn, true, close_cursor); + PQclear(res); + if (rel) relation_close(rel, AccessExclusiveLock); + + /* Complete transaction */ + res = PQexec(conn, "END"); + if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK) + report_error(ERROR, res, conn, true, "END"); + PQclear(res); + PQfinish(conn); PG_RETURN_INT64(total); }