Add state to keep track of logical replication
authorAndres Freund <andres@anarazel.de>
Sat, 9 Jun 2012 22:16:32 +0000 (00:16 +0200)
committerAndres Freund <andres@anarazel.de>
Mon, 18 Jun 2012 11:24:11 +0000 (13:24 +0200)
In order to have restartable replication with minimal additional writes its
very useful to know up to which point we have replayed/received changes from a
foreign node.

One representation of that is the lsn of changes at the originating cluster.

We need to keep track of the point up to which we received data and up to where
we applied data.

For that we added a field 'origin_lsn' to commit records. This allows to keep
track of the apply position with crash recovery with minimal additional io. We
only added the field to non-compact commit records to reduce the overhead in
case logical replication is not used.

Checkpoints need to keep track of the apply/receive positions as well because
otherwise it would be hard to determine the lsn from where to restart
receive/apply after a shutdown/crash if no changes happened since the last
shutdown/crash.

While running the startup process, the walreceiver and a (future) apply process
will need a coherent picture those two states so add shared memory state to
keep track of it. Currently this is represented in the walreceivers shared
memory segment. This will likely need to change.

During crash recovery/physical replication the origin_lsn field of commit
records is used to update the shared memory, and thus the next checkpoint's,
notion of the apply state.

Missing:

- For correct crash recovery we need more state than the 'apply lsn' because
  transactions on the originating side can overlap. At the lsn we just applied
  many other transaction can be in-progres. To correctly handle that we need to
  keep track of oldest start lsn of any transaction currently being reassembled
  (c.f. ApplyCache). Then we can start to reassemble the ApplyCache up from
  that point and throw away any transaction which comitted before the
  recorded/recovered apply lsn.
  It should be sufficient to store that knowledge in shared memory and
  checkpoint records.

src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/replication/logical/logical.c
src/backend/replication/walreceiverfuncs.c
src/include/access/xact.h
src/include/catalog/pg_control.h
src/include/replication/logical.h
src/include/replication/walreceiver.h

index dfc1ec4fbb567db1f7a9bb0dc6f6ab08daf0225b..a9bf8f5dc841a7bc2889eebe310910544d9440ef 100644 (file)
 #include "replication/logical.h"
 #include "replication/syncrep.h"
 #include "replication/walsender.h"
+#include "replication/walreceiver.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
 #include "storage/smgr.h"
+#include "storage/spin.h"
 #include "utils/combocid.h"
 #include "utils/guc.h"
 #include "utils/inval.h"
@@ -1015,7 +1017,8 @@ RecordTransactionCommit(void)
        /*
         * Do we need the long commit record? If not, use the compact format.
         */
-       if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
+       if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
+           (wal_level == WAL_LEVEL_LOGICAL && current_replication_origin_id != guc_replication_origin_id))
        {
            XLogRecData rdata[4];
            int         lastrdata = 0;
@@ -1037,6 +1040,8 @@ RecordTransactionCommit(void)
            xlrec.nrels = nrels;
            xlrec.nsubxacts = nchildren;
            xlrec.nmsgs = nmsgs;
+           xlrec.origin_lsn = current_replication_origin_lsn;
+
            rdata[0].data = (char *) (&xlrec);
            rdata[0].len = MinSizeOfXactCommit;
            rdata[0].buffer = InvalidBuffer;
@@ -4550,10 +4555,12 @@ xactGetCommittedChildren(TransactionId **ptr)
  */
 static void
 xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node,
-                          XLogRecPtr lsn, TransactionId *sub_xids,
-                          int nsubxacts, SharedInvalidationMessage *inval_msgs,
-                          int nmsgs, RelFileNode *xnodes, int nrels,
-                          Oid dbId, Oid tsId, uint32 xinfo)
+                          XLogRecPtr lsn, XLogRecPtr origin_lsn,
+                          TransactionId *sub_xids, int nsubxacts,
+                          SharedInvalidationMessage *inval_msgs, int nmsgs,
+                          RelFileNode *xnodes, int nrels,
+                          Oid dbId, Oid tsId,
+                          uint32 xinfo)
 {
    TransactionId max_xid;
    int         i;
@@ -4576,6 +4583,24 @@ xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node,
        LWLockRelease(XidGenLock);
    }
 
+   /*
+    * record where were at wrt to recovery. We need that to know from where on
+    * to restart applying logical change records
+    * We do this even if logical replay is deactivated for the moment because
+    * we need to know where to restart from if it gets re-enabled.
+    */
+   if ((!XLByteEQ(origin_lsn, InvalidXLogRecPtr)) &&
+       originating_node != InvalidMultimasterNodeId)
+   {
+       /*
+        * probably we don't need the locking because no lcr receiver can run
+        * yet.
+        */
+       SpinLockAcquire(&WalRcv->mutex);
+       WalRcv->mm_applyState[originating_node] = origin_lsn;
+       SpinLockRelease(&WalRcv->mutex);
+   }
+
    if (standbyState == STANDBY_DISABLED)
    {
        /*
@@ -4673,9 +4698,9 @@ xact_redo_commit(xl_xact_commit *xlrec, RepNodeId originating_node,
    /* invalidation messages array follows subxids */
    inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
 
-   xact_redo_commit_internal(xid, originating_node, lsn, subxacts,
-                             xlrec->nsubxacts, inval_msgs, xlrec->nmsgs,
-                             xlrec->xnodes, xlrec->nrels,
+   xact_redo_commit_internal(xid, originating_node, lsn, xlrec->origin_lsn,
+                             subxacts, xlrec->nsubxacts, inval_msgs,
+                             xlrec->nmsgs, xlrec->xnodes, xlrec->nrels,
                              xlrec->dbId, xlrec->tsId, xlrec->xinfo);
 }
 
@@ -4686,7 +4711,7 @@ static void
 xact_redo_commit_compact(xl_xact_commit_compact *xlrec, RepNodeId originating_node,
                            TransactionId xid, XLogRecPtr lsn)
 {
-   xact_redo_commit_internal(xid, originating_node, lsn, xlrec->subxacts,
+   xact_redo_commit_internal(xid, originating_node, lsn, InvalidXLogRecPtr, xlrec->subxacts,
                                xlrec->nsubxacts,
                                NULL, 0,        /* inval msgs */
                                NULL, 0,        /* relfilenodes */
index 5d1232f3099a5849ea5609809e0af4a5d78c4d08..0b1d9257e01b6de2885f1627bb750fe1fbd56e07 100644 (file)
@@ -5183,6 +5183,7 @@ BootStrapXLOG(void)
    uint64      sysidentifier;
    struct timeval tv;
    pg_crc32    crc;
+   int i;
 
    /*
     * Select a hopefully-unique system identifier code for this installation.
@@ -5229,6 +5230,13 @@ BootStrapXLOG(void)
    checkPoint.time = (pg_time_t) time(NULL);
    checkPoint.oldestActiveXid = InvalidTransactionId;
 
+   for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId;
+       i++){
+       checkPoint.logicalReceiveState[i] = InvalidXLogRecPtr;
+       checkPoint.logicalApplyState[i] = InvalidXLogRecPtr;
+   }
+
+
    ShmemVariableCache->nextXid = checkPoint.nextXid;
    ShmemVariableCache->nextOid = checkPoint.nextOid;
    ShmemVariableCache->oidCount = 0;
@@ -6315,6 +6323,53 @@ StartupXLOG(void)
        InRecovery = true;
    }
 
+   /*
+    * setup shared memory state for logical wal receiver
+    *
+    * Do this unconditionally so enabling/disabling/enabling logical replay
+    * doesn't loose information due to rewriting pg_control
+    */
+   {
+       int i;
+
+       Assert(WalRcv);
+       /* locking is not really required here afaics, but ... */
+       SpinLockAcquire(&WalRcv->mutex);
+
+       for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+           i++)
+       {
+           XLogRecPtr* receiveState = &ControlFile->checkPointCopy.logicalReceiveState[i];
+           XLogRecPtr* applyState = &ControlFile->checkPointCopy.logicalApplyState[i];
+           if(i == guc_replication_origin_id && (
+                  !XLByteEQ(*receiveState, InvalidXLogRecPtr) ||
+                  !XLByteEQ(*applyState, InvalidXLogRecPtr))
+               )
+           {
+               elog(WARNING, "logical recovery state for own db. apply: %X/%X, receive %X/%X, origin %d",
+                    applyState->xlogid, applyState->xrecoff,
+                    receiveState->xlogid, receiveState->xrecoff,
+                    guc_replication_origin_id);
+               WalRcv->mm_receiveState[i] = InvalidXLogRecPtr;
+               WalRcv->mm_applyState[i] = InvalidXLogRecPtr;
+           }
+           else{
+               WalRcv->mm_receiveState[i] = *receiveState;
+               WalRcv->mm_applyState[i] = *applyState;
+           }
+       }
+       SpinLockRelease(&WalRcv->mutex);
+
+       /* FIXME: remove at some point */
+       for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+           i++){
+           elog(LOG, "restored apply state for node %d to %X/%X, receive %X/%X",
+                i,
+                WalRcv->mm_applyState[i].xlogid, WalRcv->mm_applyState[i].xrecoff,
+                WalRcv->mm_receiveState[i].xlogid, WalRcv->mm_receiveState[i].xrecoff);
+       }
+   }
+
    /* REDO */
    if (InRecovery)
    {
@@ -7904,6 +7959,24 @@ CreateCheckPoint(int flags)
                             &checkPoint.nextMulti,
                             &checkPoint.nextMultiOffset);
 
+   /*
+    * fill out where are at wrt logical replay. Do this unconditionally so we
+    * don't loose information due to rewriting pg_control when toggling
+    * logical replay
+    */
+   {
+       int i;
+       SpinLockAcquire(&WalRcv->mutex);
+
+       for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+           i++){
+           checkPoint.logicalApplyState[i] = WalRcv->mm_applyState[i];
+           checkPoint.logicalReceiveState[i] = WalRcv->mm_receiveState[i];
+       }
+       SpinLockRelease(&WalRcv->mutex);
+       elog(LOG, "updated logical checkpoint data");
+   }
+
    /*
     * Having constructed the checkpoint record, ensure all shmem disk buffers
     * and commit-log buffers are flushed to disk.
index 1d9e9578dfd1221fe6ac358ba344d3713a27255b..4f344885e389b4acfb5fab497d260a2291c6de35 100644 (file)
@@ -16,3 +16,4 @@
 #include "replication/logical.h"
 int guc_replication_origin_id = InvalidMultimasterNodeId;
 RepNodeId current_replication_origin_id = InvalidMultimasterNodeId;
+XLogRecPtr current_replication_origin_lsn = {0, 0};
index 876196f9da397fe578a5b1819f6d34ff9d563b3f..cb49282e73dd20c60bbe4471ef8da6de41fefd25 100644 (file)
@@ -64,6 +64,14 @@ WalRcvShmemInit(void)
        MemSet(WalRcv, 0, WalRcvShmemSize());
        WalRcv->walRcvState = WALRCV_STOPPED;
        SpinLockInit(&WalRcv->mutex);
+
+       memset(&WalRcv->mm_receiveState,
+              0, sizeof(WalRcv->mm_receiveState));
+       memset(&WalRcv->mm_applyState,
+              0, sizeof(WalRcv->mm_applyState));
+
+       memset(&WalRcv->mm_receiveLatch,
+              0, sizeof(WalRcv->mm_receiveLatch));
    }
 }
 
index b12d2a0068531d311137ed5b18c8cd17e015cd0f..275778257527938e54a626eac857cf5165ec3f41 100644 (file)
@@ -137,6 +137,7 @@ typedef struct xl_xact_commit
    int         nmsgs;          /* number of shared inval msgs */
    Oid         dbId;           /* MyDatabaseId */
    Oid         tsId;           /* MyDatabaseTableSpace */
+   XLogRecPtr  origin_lsn;     /* location of originating commit */
    /* Array of RelFileNode(s) to drop at commit */
    RelFileNode xnodes[1];      /* VARIABLE LENGTH ARRAY */
    /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
index 5cff39608bae71caa16e6f99039baa60c7491737..bc6316ec7719bc5a133e934ba93dcfb29a9d387a 100644 (file)
 #define PG_CONTROL_H
 
 #include "access/xlogdefs.h"
+#include "replication/logical.h"
 #include "pgtime.h"                /* for pg_time_t */
 #include "utils/pg_crc.h"
 
 
 /* Version identifier for this pg_control format */
-#define PG_CONTROL_VERSION 922
+#define PG_CONTROL_VERSION 923
 
 /*
  * Body of CheckPoint XLOG records.  This is declared here because we keep
@@ -50,6 +51,13 @@ typedef struct CheckPoint
     * it's set to InvalidTransactionId.
     */
    TransactionId oldestActiveXid;
+
+   /*
+    * The replay state from every other node. This is only needed if wal_level
+    * >= logical and thus is only filled then.
+    */
+   XLogRecPtr logicalApplyState[MaxMultimasterNodeId - 1];
+   XLogRecPtr logicalReceiveState[MaxMultimasterNodeId - 1];
 } CheckPoint;
 
 /* XLOG info values for XLOG rmgr */
@@ -85,6 +93,9 @@ typedef enum DBState
  * NOTE: try to keep this under 512 bytes so that it will fit on one physical
  * sector of typical disk drives.  This reduces the odds of corruption due to
  * power failure midway through a write.
+ *
+ * FIXME: in order to allow many nodes in mm (which increases checkpoint size)
+ * we should change the writing of this to write(temp_file);fsync();rename();fsync();
  */
 
 typedef struct ControlFileData
index c3680ef8e81e88f802c0fcf1ce1952dd3ab2a561..8f44fad82d6aaf25d96e5da95885fec06d5d1bf9 100644 (file)
@@ -15,6 +15,7 @@
 
 extern int guc_replication_origin_id;
 extern RepNodeId current_replication_origin_id;
+extern XLogRecPtr current_replication_origin_lsn;
 
 #define InvalidMultimasterNodeId 0
 #define MaxMultimasterNodeId (2<<3)
index d21ec94a45511b3432515f3b7175da883a112503..c9ab1be4192e190db0416b5d9c33a98731712117 100644 (file)
@@ -14,6 +14,8 @@
 
 #include "access/xlog.h"
 #include "access/xlogdefs.h"
+#include "replication/logical.h"
+#include "storage/latch.h"
 #include "storage/spin.h"
 #include "pgtime.h"
 
@@ -90,6 +92,17 @@ typedef struct
    char        conninfo[MAXCONNINFO];
 
    slock_t     mutex;          /* locks shared variables shown above */
+
+   /*
+    * replay point up to which we replayed for every node
+    * XXX: should possibly be dynamically sized?
+    * FIXME: should go to its own shm segment?
+    */
+   XLogRecPtr  mm_receiveState[MaxMultimasterNodeId - 1];
+   XLogRecPtr  mm_applyState[MaxMultimasterNodeId - 1];
+
+   Latch*       mm_receiveLatch[MaxMultimasterNodeId - 1];
+
 } WalRcvData;
 
 extern WalRcvData *WalRcv;