From 847b227dac1ce2e9554a32ff95b8d618f8725843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= Date: Fri, 1 Jan 2021 01:14:47 +0100 Subject: [PATCH 1/2] Add pg_stat_progress_copy view with COPY progress report. --- doc/src/sgml/monitoring.sgml | 101 +++++++++++++++++++++++ src/backend/catalog/system_views.sql | 11 +++ src/backend/commands/copyfrom.c | 16 +++- src/backend/commands/copyfromparse.c | 4 + src/backend/commands/copyto.c | 21 ++++- src/backend/utils/adt/pgstatfuncs.c | 2 + src/include/commands/copyfrom_internal.h | 1 + src/include/commands/progress.h | 5 ++ src/include/pgstat.h | 3 +- src/test/regress/expected/rules.out | 9 ++ 10 files changed, 168 insertions(+), 5 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3d6c901306777..51d261defd94a 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -399,6 +399,12 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser + + pg_stat_progress_copypg_stat_progress_copy + One row for each backend running COPY, showing current progress. + See . + + @@ -5247,6 +5253,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, which support progress reporting are ANALYZE, CLUSTER, CREATE INDEX, VACUUM, + COPY, and (i.e., replication command that issues to take a base backup). @@ -6396,6 +6403,100 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, + + + COPY Progress Reporting + + + Whenever COPY is running, the + pg_stat_copy_progress view will contain one row + for each backend that is currently running COPY command. + The table bellow describes the information that will be reported and provide + information how to interpret it. + + + + <structname>pg_stat_progress_copy</structname> View + + + + + Column Type + + + Description + + + + + + + + pid integer + + + Process ID of backend. + + + + + + datid text + + + OID of the database to which this backend is connected. + + + + + + datname name + + + Name of the database to which this backend is connected. + + + + + + relid oid + + + OID of the table on which the COPY command is executed. It is set to 0 if SELECT query is provided. + + + + + + bytes_processed bigint + + + Number of bytes already processed by COPY command. + + + + + + bytes_processed bigint + + + Size of source file for COPY FROM command in bytes. It is set to 0 if not available. + + + + + + line_processed bigint + + + Number of lines already processed by COPY command. + + + + +
+
+ diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b140c210bc799..d2fb40d1d0769 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1117,6 +1117,17 @@ CREATE VIEW pg_stat_progress_basebackup AS S.param5 AS tablespaces_streamed FROM pg_stat_get_progress_info('BASEBACKUP') AS S; + +CREATE VIEW pg_stat_progress_copy AS + SELECT + S.pid AS pid, S.datid AS datid, D.datname AS datname, + S.relid AS relid, + S.param1 AS bytes_processed, + S.param2 AS bytes_total, + S.param3 AS lines_processed + FROM pg_stat_get_progress_info('COPY') AS S + LEFT JOIN pg_database D ON S.datid = D.oid; + CREATE VIEW pg_user_mappings AS SELECT U.oid AS umid, diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 1b14e9a6eb034..b938a55f66adc 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -25,6 +25,7 @@ #include "access/xlog.h" #include "commands/copy.h" #include "commands/copyfrom_internal.h" +#include "commands/progress.h" #include "commands/trigger.h" #include "executor/execPartition.h" #include "executor/executor.h" @@ -35,6 +36,7 @@ #include "libpq/pqformat.h" #include "miscadmin.h" #include "optimizer/optimizer.h" +#include "pgstat.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" #include "tcop/tcopprot.h" @@ -1100,9 +1102,10 @@ CopyFrom(CopyFromState cstate) /* * We count only tuples not suppressed by a BEFORE INSERT trigger * or FDW; this is the same definition used by nodeModifyTable.c - * for counting tuples inserted by an INSERT command. + * for counting tuples inserted by an INSERT command. Update + * progress as well */ - processed++; + pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); } } @@ -1415,6 +1418,11 @@ BeginCopyFrom(ParseState *pstate, } } + + /* initialize progress */ + pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); + cstate->bytes_processed = 0; + /* We keep those variables in cstate. */ cstate->in_functions = in_functions; cstate->typioparams = typioparams; @@ -1479,6 +1487,8 @@ BeginCopyFrom(ParseState *pstate, ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", cstate->filename))); + + pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size); } } @@ -1522,6 +1532,8 @@ EndCopyFrom(CopyFromState cstate) cstate->filename))); } + pgstat_progress_end_command(); + MemoryContextDelete(cstate->copycontext); pfree(cstate); } diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 34ed3cfcd5b43..e655c6ed95027 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -20,11 +20,13 @@ #include "commands/copy.h" #include "commands/copyfrom_internal.h" +#include "commands/progress.h" #include "executor/executor.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "pgstat.h" #include "port/pg_bswap.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -384,6 +386,8 @@ CopyLoadRawBuf(CopyFromState cstate) cstate->raw_buf[nbytes] = '\0'; cstate->raw_buf_index = 0; cstate->raw_buf_len = nbytes; + cstate->bytes_processed += nbytes; + pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed); return (inbytes > 0); } diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index c7e5f04446310..a6eba642457c9 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -24,6 +24,7 @@ #include "access/xact.h" #include "access/xlog.h" #include "commands/copy.h" +#include "commands/progress.h" #include "executor/execdesc.h" #include "executor/executor.h" #include "executor/tuptable.h" @@ -32,6 +33,7 @@ #include "mb/pg_wchar.h" #include "miscadmin.h" #include "optimizer/optimizer.h" +#include "pgstat.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" #include "tcop/tcopprot.h" @@ -95,6 +97,7 @@ typedef struct CopyToStateData FmgrInfo *out_functions; /* lookup info for output functions */ MemoryContext rowcontext; /* per-row evaluation context */ + uint64 bytes_processed;/* total # of bytes processed, used for progress reporting */ } CopyToStateData; @@ -288,6 +291,10 @@ CopySendEndOfRow(CopyToState cstate) break; } + /* Update the progress */ + cstate->bytes_processed += fe_msgbuf->len; + pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed); + resetStringInfo(fe_msgbuf); } @@ -363,6 +370,8 @@ EndCopy(CopyToState cstate) cstate->filename))); } + pgstat_progress_end_command(); + MemoryContextDelete(cstate->copycontext); pfree(cstate); } @@ -760,6 +769,10 @@ BeginCopyTo(ParseState *pstate, } } + /* initialize progress */ + pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); + cstate->bytes_processed = 0; + MemoryContextSwitchTo(oldcontext); return cstate; @@ -938,7 +951,9 @@ CopyTo(CopyToState cstate) /* Format and send the data */ CopyOneRowTo(cstate, slot); - processed++; + + /* Increment amount of processed tuples and update the progress */ + pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); } ExecDropSingleTupleTableSlot(slot); @@ -1303,7 +1318,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) /* Send the data */ CopyOneRowTo(cstate, slot); - myState->processed++; + + /* Increment amount of processed tuples and update the progress */ + pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed); return true; } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 6afe1b6f56ebc..c83f47390bf4f 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -494,6 +494,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) cmdtype = PROGRESS_COMMAND_CREATE_INDEX; else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0) cmdtype = PROGRESS_COMMAND_BASEBACKUP; + else if (pg_strcasecmp(cmd, "COPY") == 0) + cmdtype = PROGRESS_COMMAND_COPY; else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index c15ea803c329f..ae76be295a9bf 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -154,6 +154,7 @@ typedef struct CopyFromStateData char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ + uint64 bytes_processed;/* total # of bytes processed, used for progress reporting */ /* Shorthand for number of unconsumed bytes available in raw_buf */ #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) } CopyFromStateData; diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index 36b073e67757b..fa0f65eb8f587 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -133,4 +133,9 @@ #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5 +/* Commands of PROGRESS_COPY */ +#define PROGRESS_COPY_BYTES_PROCESSED 0 +#define PROGRESS_COPY_BYTES_TOTAL 1 +#define PROGRESS_COPY_LINES_PROCESSED 2 + #endif diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 5954068dec534..b2a8cafc90883 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -1077,7 +1077,8 @@ typedef enum ProgressCommandType PROGRESS_COMMAND_ANALYZE, PROGRESS_COMMAND_CLUSTER, PROGRESS_COMMAND_CREATE_INDEX, - PROGRESS_COMMAND_BASEBACKUP + PROGRESS_COMMAND_BASEBACKUP, + PROGRESS_COMMAND_COPY } ProgressCommandType; #define PGSTAT_NUM_PROGRESS_PARAM 20 diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6293ab57bcf61..a687e99d1e4fe 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1937,6 +1937,15 @@ pg_stat_progress_cluster| SELECT s.pid, s.param8 AS index_rebuild_count FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); +pg_stat_progress_copy| SELECT s.pid, + s.datid, + d.datname, + s.relid, + s.param1 AS bytes_processed, + s.param2 AS bytes_total, + s.param3 AS lines_processed + FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) + LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_create_index| SELECT s.pid, s.datid, d.datname, From 6d2ee68b227c05ce4d1eb95a4c4a9c4f7dd6fbfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= Date: Tue, 5 Jan 2021 02:07:03 +0100 Subject: [PATCH 2/2] Fix docs and comment. --- doc/src/sgml/monitoring.sgml | 4 ++-- src/include/commands/copyfrom_internal.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 51d261defd94a..875133303e19d 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -6477,7 +6477,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, - bytes_processed bigint + bytes_total bigint Size of source file for COPY FROM command in bytes. It is set to 0 if not available. @@ -6486,7 +6486,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, - line_processed bigint + lines_processed bigint Number of lines already processed by COPY command. diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index ae76be295a9bf..80fac1e58a12a 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -154,7 +154,7 @@ typedef struct CopyFromStateData char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ - uint64 bytes_processed;/* total # of bytes processed, used for progress reporting */ + uint64 bytes_processed;/* number of bytes processed so far */ /* Shorthand for number of unconsumed bytes available in raw_buf */ #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) } CopyFromStateData;