Add view to inspect postgres log through SQL

This commit is contained in:
Konstantin Knizhnik
2023-04-25 17:37:22 +03:00
parent afbbc61036
commit 44dfe405cf
2 changed files with 171 additions and 0 deletions

View File

@@ -32,3 +32,40 @@ CREATE VIEW local_cache AS
SELECT P.* FROM local_cache_pages() AS P
(pageoffs int8, relfilenode oid, reltablespace oid, reldatabase oid,
relforknumber int2, relblocknumber int8, accesscount int4);
create table postgres_log (
log_time timestamp(3) with time zone,
user_name text,
database_name text,
process_id integer,
connection_from text,
session_id text,
session_line_num bigint,
command_tag text,
session_start_time timestamp with time zone,
virtual_transaction_id text,
transaction_id bigint,
error_severity text,
sql_state_code text,
message text,
detail text,
hint text,
internal_query text,
internal_query_pos integer,
context text,
query text,
query_pos integer,
location text,
application_name text,
backend_type text,
leader_pid integer,
query_id bigint
);
CREATE FUNCTION read_postgres_log()
RETURNS setof postgres_log
AS 'MODULE_PATHNAME', 'read_postgres_log'
LANGUAGE C PARALLEL SAFE;
CREATE VIEW pg_log AS
SELECT * from read_postgres_log();

View File

@@ -11,8 +11,14 @@
#include "postgres.h"
#include "fmgr.h"
#include <sys/stat.h>
#include "access/table.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "commands/copy.h"
#include "nodes/makefuncs.h"
#include "nodes/value.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "catalog/pg_type.h"
@@ -20,6 +26,7 @@
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/guc.h"
#include "neon.h"
@@ -40,6 +47,7 @@ _PG_init(void)
PG_FUNCTION_INFO_V1(pg_cluster_size);
PG_FUNCTION_INFO_V1(backpressure_lsns);
PG_FUNCTION_INFO_V1(backpressure_throttling_time);
PG_FUNCTION_INFO_V1(read_postgres_log);
Datum
pg_cluster_size(PG_FUNCTION_ARGS)
@@ -85,3 +93,129 @@ backpressure_throttling_time(PG_FUNCTION_ARGS)
{
PG_RETURN_UINT64(BackpressureThrottlingTime());
}
#define PG_LOG_DIR "log"
#define POSTGRES_LOG "postgres_log"
#define LOG_TABLE_N_COLUMS 26
typedef struct {
char* path;
time_t ctime;
} LogFile;
typedef struct
{
Relation log_table;
List* log_files;
CopyFromState copy_state;
ListCell* curr_log;
} LogfileContext;
static int cmp_log_ctime(const ListCell *a, const ListCell *b)
{
LogFile* la = (LogFile*)lfirst(a);
LogFile* lb = (LogFile*)lfirst(b);
return la->ctime < lb->ctime ? -1 : la->ctime == lb->ctime ? 0 : 1;
}
Datum
read_postgres_log(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
MemoryContext oldcontext;
LogfileContext *fctx; /* User function context. */
List* log_files = NULL;
if (SRF_IS_FIRSTCALL())
{
struct dirent *dent;
DIR* dir;
struct stat statbuf;
char* path;
funcctx = SRF_FIRSTCALL_INIT();
/* Switch context when allocating stuff to be used in later calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* Create a user function context for cross-call persistence */
fctx = (LogfileContext *) palloc(sizeof(LogfileContext));
fctx->log_files = NULL;
fctx->copy_state = NULL;
if ((dir = AllocateDir(PG_LOG_DIR)) != NULL)
{
while ((dent = ReadDirExtended(dir, PG_LOG_DIR, LOG)) != NULL)
{
/* Ignore non-csv files */
if (strcmp(dent->d_name + strlen(dent->d_name) - 4, ".csv") != 0)
continue;
path = psprintf("%s/%s", PG_LOG_DIR, dent->d_name);
if (stat(path, &statbuf) == 0)
{
LogFile* log = (LogFile*)palloc(sizeof(LogFile));
log->ctime = statbuf.st_ctime;
log->path = path;
fctx->log_files = lappend(fctx->log_files, log);
}
else if (errno != ENOENT) /* file can be concurrently removed */
{
elog(LOG, "Failed to access log file %s", path);
pfree(path);
}
}
FreeDir(dir);
}
list_sort(fctx->log_files, cmp_log_ctime);
fctx->log_table = table_openrv(makeRangeVar(NULL, POSTGRES_LOG, -1), AccessShareLock);
fctx->curr_log = list_head(fctx->log_files);
/* Remember the user function context. */
funcctx->user_fctx = fctx;
/* Return to original context when allocating transient memory */
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
/* Get the saved state */
fctx = funcctx->user_fctx;
while (fctx->curr_log != NULL)
{
if (fctx->copy_state == NULL)
{
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
fctx->copy_state = BeginCopyFrom(NULL,
fctx->log_table,
NULL,
((LogFile*)lfirst(fctx->curr_log))->path,
false,
NULL,
NIL,
list_make1(makeDefElem("format", (Node *) makeString("csv"), -1)));
MemoryContextSwitchTo(oldcontext);
}
if (fctx->copy_state != NULL)
{
Datum values[LOG_TABLE_N_COLUMS];
bool nulls[LOG_TABLE_N_COLUMS];
if (NextCopyFrom(fctx->copy_state, NULL,
values, nulls))
{
HeapTuple tuple = heap_form_tuple(RelationGetDescr(fctx->log_table), values, nulls);
Datum result = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcctx, result);
}
EndCopyFrom(fctx->copy_state);
fctx->copy_state = NULL;
}
fctx->curr_log = lnext(fctx->log_files, fctx->curr_log);
}
table_close(fctx->log_table, AccessShareLock);
SRF_RETURN_DONE(funcctx);
}