Introduce the concept that wal has a 'origin' node
authorAndres Freund <andres@anarazel.de>
Sat, 9 Jun 2012 12:49:34 +0000 (14:49 +0200)
committerAndres Freund <andres@anarazel.de>
Mon, 18 Jun 2012 11:11:40 +0000 (13:11 +0200)
One solution to avoid loops when doing wal based logical replication in
topologies which are more complex than one unidirectional transport is
introducing the concept of a 'origin_id' into the wal stream. Luckily there is
some padding in the XLogRecord struct that allows us to add that field without
further bloating the struct.
This solution was chosen because it allows for just about any topology and is
inobstrusive.

This adds a new configuration parameter multimaster_node_id which determines
the id used for wal originating in one cluster.

When applying changes from wal from another cluster code can set the variable
current_replication_origin_id. This is a global variable because passing it
through everything which can generate wal would be far to intrusive.

src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogreader.c
src/backend/replication/logical/Makefile
src/backend/replication/logical/logical.c [new file with mode: 0644]
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/bin/pg_resetxlog/pg_resetxlog.c
src/include/access/xlog.h
src/include/access/xlogdefs.h
src/include/replication/logical.h [new file with mode: 0644]

index 7260f9c50f7456b2576dc2ebfb7e64184f9e7b55..dfc1ec4fbb567db1f7a9bb0dc6f6ab08daf0225b 100644 (file)
@@ -36,8 +36,9 @@
 #include "libpq/be-fsstubs.h"
 #include "miscadmin.h"
 #include "pgstat.h"
-#include "replication/walsender.h"
+#include "replication/logical.h"
 #include "replication/syncrep.h"
+#include "replication/walsender.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/procarray.h"
@@ -4548,12 +4549,11 @@ xactGetCommittedChildren(TransactionId **ptr)
  * actions for which the order of execution is critical.
  */
 static void
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
-                         TransactionId *sub_xids, int nsubxacts,
-                         SharedInvalidationMessage *inval_msgs, int nmsgs,
-                         RelFileNode *xnodes, int nrels,
-                         Oid dbId, Oid tsId,
-                         uint32 xinfo)
+xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node,
+                          XLogRecPtr lsn, TransactionId *sub_xids,
+                          int nsubxacts, SharedInvalidationMessage *inval_msgs,
+                          int nmsgs, RelFileNode *xnodes, int nrels,
+                          Oid dbId, Oid tsId, uint32 xinfo)
 {
    TransactionId max_xid;
    int         i;
@@ -4662,8 +4662,8 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
  * Utility function to call xact_redo_commit_internal after breaking down xlrec
  */
 static void
-xact_redo_commit(xl_xact_commit *xlrec,
-                TransactionId xid, XLogRecPtr lsn)
+xact_redo_commit(xl_xact_commit *xlrec, RepNodeId originating_node,
+                           TransactionId xid, XLogRecPtr lsn)
 {
    TransactionId *subxacts;
    SharedInvalidationMessage *inval_msgs;
@@ -4673,27 +4673,26 @@ xact_redo_commit(xl_xact_commit *xlrec,
    /* invalidation messages array follows subxids */
    inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
 
-   xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
-                             inval_msgs, xlrec->nmsgs,
-                             xlrec->xnodes, xlrec->nrels,
-                             xlrec->dbId,
-                             xlrec->tsId,
-                             xlrec->xinfo);
+   xact_redo_commit_internal(xid, originating_node, lsn, subxacts,
+                             xlrec->nsubxacts, inval_msgs, xlrec->nmsgs,
+                             xlrec->xnodes, xlrec->nrels,
+                             xlrec->dbId, xlrec->tsId, xlrec->xinfo);
 }
 
 /*
  * Utility function to call xact_redo_commit_internal  for compact form of message.
  */
 static void
-xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
-                        TransactionId xid, XLogRecPtr lsn)
+xact_redo_commit_compact(xl_xact_commit_compact *xlrec, RepNodeId originating_node,
+                           TransactionId xid, XLogRecPtr lsn)
 {
-   xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts,
-                             NULL, 0,  /* inval msgs */
-                             NULL, 0,  /* relfilenodes */
-                             InvalidOid,       /* dbId */
-                             InvalidOid,       /* tsId */
-                             0);       /* xinfo */
+   xact_redo_commit_internal(xid, originating_node, lsn, xlrec->subxacts,
+                               xlrec->nsubxacts,
+                               NULL, 0,        /* inval msgs */
+                               NULL, 0,        /* relfilenodes */
+                               InvalidOid,     /* dbId */
+                               InvalidOid,     /* tsId */
+                               0);             /* xinfo */
 }
 
 /*
@@ -4789,17 +4788,18 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
    /* Backup blocks are not used in xact records */
    Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
 
+   /* FIXME: we probably shouldn't pass xl_origin_id at multiple places, hm */
    if (info == XLOG_XACT_COMMIT_COMPACT)
    {
        xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
 
-       xact_redo_commit_compact(xlrec, record->xl_xid, lsn);
+       xact_redo_commit_compact(xlrec, record->xl_origin_id, record->xl_xid, lsn);
    }
    else if (info == XLOG_XACT_COMMIT)
    {
        xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
 
-       xact_redo_commit(xlrec, record->xl_xid, lsn);
+       xact_redo_commit(xlrec, record->xl_origin_id, record->xl_xid, lsn);
    }
    else if (info == XLOG_XACT_ABORT)
    {
@@ -4817,7 +4817,7 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
    {
        xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
 
-       xact_redo_commit(&xlrec->crec, xlrec->xid, lsn);
+       xact_redo_commit(&xlrec->crec, record->xl_origin_id, xlrec->xid, lsn);
        RemoveTwoPhaseFile(xlrec->xid, false);
    }
    else if (info == XLOG_XACT_ABORT_PREPARED)
index 5d4e09f89a501ebfcf0d3e6798e935e145c49b08..2bcc0a28c8a24361a73332fb25b6c51f3f5c1a64 100644 (file)
@@ -42,6 +42,7 @@
 #include "postmaster/startup.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "replication/logical.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -1032,7 +1033,7 @@ begin:;
    record->xl_len = len;       /* doesn't include backup blocks */
    record->xl_info = info;
    record->xl_rmid = rmid;
-
+   record->xl_origin_id = current_replication_origin_id;
    /* Now we can finish computing the record's CRC */
    COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
               SizeOfXLogRecord - sizeof(pg_crc32));
@@ -5243,6 +5244,7 @@ BootStrapXLOG(void)
    record->xl_len = sizeof(checkPoint);
    record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
    record->xl_rmid = RM_XLOG_ID;
+   record->xl_origin_id = InvalidMultimasterNodeId;
    memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
 
    INIT_CRC32(crc);
index 6f15d66f6579dda6ab3d462d8b364bb2c1cd2ddc..bacd31ef93ad3e0cb1b1bf04fb92d697840bad60 100644 (file)
@@ -24,6 +24,7 @@
 #include "access/xlogreader.h"
 
 /* FIXME */
+#include "replication/logical.h" /* InvalidMultimasterNodeId */
 #include "replication/walsender_private.h"
 #include "replication/walprotocol.h"
 
@@ -563,6 +564,7 @@ XLogReaderRead(XLogReaderState* state)
                spacer.xl_len = temp_record->xl_tot_len - SizeOfXLogRecord;
                spacer.xl_rmid = RM_XLOG_ID;
                spacer.xl_info = XLOG_NOOP;
+               spacer.xl_origin_id = InvalidMultimasterNodeId;
 
                state->writeout_data(state,
                                     (char*)&spacer,
index 7dd966325ed23ec318c488228983b795ea5741b2..c2d6d8234ad2e474b170e4566514f22c0d84046f 100644 (file)
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = applycache.o decode.o
+OBJS = applycache.o decode.o logical.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
new file mode 100644 (file)
index 0000000..1d9e957
--- /dev/null
@@ -0,0 +1,18 @@
+/*-------------------------------------------------------------------------
+ *
+ * logical.c
+ *
+ * Support functions for logical/multimaster replication
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *   src/backend/replication/logical.c
+ *
+ */
+#include "postgres.h"
+#include "replication/logical.h"
+int guc_replication_origin_id = InvalidMultimasterNodeId;
+RepNodeId current_replication_origin_id = InvalidMultimasterNodeId;
index 93c798b5014d65245ef5c9d738a3542cb8144a60..46b0657cb497b96b503483f283e4d1b1dc66095d 100644 (file)
@@ -60,6 +60,7 @@
 #include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "replication/logical.h"
 #include "storage/bufmgr.h"
 #include "storage/standby.h"
 #include "storage/fd.h"
@@ -198,6 +199,7 @@ static const char *show_tcp_keepalives_interval(void);
 static const char *show_tcp_keepalives_count(void);
 static bool check_maxconnections(int *newval, void **extra, GucSource source);
 static void assign_maxconnections(int newval, void *extra);
+static void assign_replication_node_id(int newval, void *extra);
 static bool check_maxworkers(int *newval, void **extra, GucSource source);
 static void assign_maxworkers(int newval, void *extra);
 static bool check_autovacuum_max_workers(int *newval, void **extra, GucSource source);
@@ -1597,6 +1599,16 @@ static struct config_int ConfigureNamesInt[] =
        NULL, NULL, NULL
    },
 
+   {
+       {"multimaster_node_id", PGC_POSTMASTER, REPLICATION_MASTER,
+           gettext_noop("node id for multimaster."),
+           NULL
+       },
+       &guc_replication_origin_id,
+       InvalidMultimasterNodeId, InvalidMultimasterNodeId, MaxMultimasterNodeId,
+       NULL, assign_replication_node_id, NULL
+   },
+
    {
        {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
            gettext_noop("Sets the maximum number of concurrent connections."),
@@ -8629,6 +8641,13 @@ assign_maxconnections(int newval, void *extra)
    MaxBackends = newval + MaxWorkers + autovacuum_max_workers + 1;
 }
 
+static void
+assign_replication_node_id(int newval, void *extra)
+{
+   guc_replication_origin_id = newval;
+   current_replication_origin_id = newval;
+}
+
 static bool
 check_maxworkers(int *newval, void **extra, GucSource source)
 {
index ce3fc08ffa37bd71b4d51c7b8c59d4a19b341798..12f8a3fe7ecb4e11c606a75a0fb159060b2494ea 100644 (file)
 #hot_standby_feedback = off        # send info from standby to prevent
                    # query conflicts
 
+# - Multi Master Servers -
+
+#multimaster_node_id = 0 #invalid node id
 
 #------------------------------------------------------------------------------
 # QUERY TUNING
index 5ecf5c49304692abc3702b1089586a803f2b20f4..f35d883ff08c8c088da4a0f93610387da1e70218 100644 (file)
@@ -54,7 +54,7 @@
 #include "access/xlog_internal.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
-
+#include "replication/logical.h"
 extern int optind;
 extern char *optarg;
 
@@ -961,6 +961,7 @@ WriteEmptyXLOG(void)
    record->xl_len = sizeof(CheckPoint);
    record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
    record->xl_rmid = RM_XLOG_ID;
+   record->xl_origin_id = InvalidMultimasterNodeId;
    memcpy(XLogRecGetData(record), &ControlFile.checkPointCopy,
           sizeof(CheckPoint));
 
index 2843acac97a60566bcba17de42cb53ddd9ac5be4..dd89cff8c4276d2270a9b777b7be8ddf0d034951 100644 (file)
@@ -47,8 +47,8 @@ typedef struct XLogRecord
    uint32      xl_len;         /* total len of rmgr data */
    uint8       xl_info;        /* flag bits, see below */
    RmgrId      xl_rmid;        /* resource manager for this record */
-
-   /* Depending on MAXALIGN, there are either 2 or 6 wasted bytes here */
+   RepNodeId   xl_origin_id;   /* what node did originally cause this record to be written */
+   /* Depending on MAXALIGN, there are either 0 or 4 wasted bytes here */
 
    /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
 
index 5e6d7e600bdd1dcc7a70b52d96e47feef757fb8b..c774285d8938fccb7d498c2195ee9bada137bbf0 100644 (file)
@@ -83,6 +83,8 @@ typedef struct XLogRecPtr
  */
 typedef uint32 TimeLineID;
 
+typedef uint16 RepNodeId;
+
 /*
  * Because O_DIRECT bypasses the kernel buffers, and because we never
  * read those buffers except during crash recovery or if wal_level != minimal,
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
new file mode 100644 (file)
index 0000000..e04c7f3
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * logical.h
+ *
+ * PostgreSQL logical replication support
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/replication/logical.h
+ */
+#ifndef LOGICAL_H
+#define LOGICAL_H
+
+#include "access/xlogdefs.h"
+
+extern int guc_replication_origin_id;
+extern RepNodeId current_replication_origin_id;
+
+#define InvalidMultimasterNodeId 0
+#define MaxMultimasterNodeId (2<<3)
+#endif