#include "replication/logical.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "replication/walreceiver.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
+#include "storage/spin.h"
#include "utils/combocid.h"
#include "utils/guc.h"
#include "utils/inval.h"
/*
* Do we need the long commit record? If not, use the compact format.
*/
- if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
+ if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
+ (wal_level == WAL_LEVEL_LOGICAL && current_replication_origin_id != guc_replication_origin_id))
{
XLogRecData rdata[4];
int lastrdata = 0;
xlrec.nrels = nrels;
xlrec.nsubxacts = nchildren;
xlrec.nmsgs = nmsgs;
+ xlrec.origin_lsn = current_replication_origin_lsn;
+
rdata[0].data = (char *) (&xlrec);
rdata[0].len = MinSizeOfXactCommit;
rdata[0].buffer = InvalidBuffer;
*/
static void
xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node,
- XLogRecPtr lsn, TransactionId *sub_xids,
- int nsubxacts, SharedInvalidationMessage *inval_msgs,
- int nmsgs, RelFileNode *xnodes, int nrels,
- Oid dbId, Oid tsId, uint32 xinfo)
+ XLogRecPtr lsn, XLogRecPtr origin_lsn,
+ TransactionId *sub_xids, int nsubxacts,
+ SharedInvalidationMessage *inval_msgs, int nmsgs,
+ RelFileNode *xnodes, int nrels,
+ Oid dbId, Oid tsId,
+ uint32 xinfo)
{
TransactionId max_xid;
int i;
LWLockRelease(XidGenLock);
}
+ /*
+ * record where were at wrt to recovery. We need that to know from where on
+ * to restart applying logical change records
+ * We do this even if logical replay is deactivated for the moment because
+ * we need to know where to restart from if it gets re-enabled.
+ */
+ if ((!XLByteEQ(origin_lsn, InvalidXLogRecPtr)) &&
+ originating_node != InvalidMultimasterNodeId)
+ {
+ /*
+ * probably we don't need the locking because no lcr receiver can run
+ * yet.
+ */
+ SpinLockAcquire(&WalRcv->mutex);
+ WalRcv->mm_applyState[originating_node] = origin_lsn;
+ SpinLockRelease(&WalRcv->mutex);
+ }
+
if (standbyState == STANDBY_DISABLED)
{
/*
/* invalidation messages array follows subxids */
inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
- xact_redo_commit_internal(xid, originating_node, lsn, subxacts,
- xlrec->nsubxacts, inval_msgs, xlrec->nmsgs,
- xlrec->xnodes, xlrec->nrels,
+ xact_redo_commit_internal(xid, originating_node, lsn, xlrec->origin_lsn,
+ subxacts, xlrec->nsubxacts, inval_msgs,
+ xlrec->nmsgs, xlrec->xnodes, xlrec->nrels,
xlrec->dbId, xlrec->tsId, xlrec->xinfo);
}
xact_redo_commit_compact(xl_xact_commit_compact *xlrec, RepNodeId originating_node,
TransactionId xid, XLogRecPtr lsn)
{
- xact_redo_commit_internal(xid, originating_node, lsn, xlrec->subxacts,
+ xact_redo_commit_internal(xid, originating_node, lsn, InvalidXLogRecPtr, xlrec->subxacts,
xlrec->nsubxacts,
NULL, 0, /* inval msgs */
NULL, 0, /* relfilenodes */
uint64 sysidentifier;
struct timeval tv;
pg_crc32 crc;
+ int i;
/*
* Select a hopefully-unique system identifier code for this installation.
checkPoint.time = (pg_time_t) time(NULL);
checkPoint.oldestActiveXid = InvalidTransactionId;
+ for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId;
+ i++){
+ checkPoint.logicalReceiveState[i] = InvalidXLogRecPtr;
+ checkPoint.logicalApplyState[i] = InvalidXLogRecPtr;
+ }
+
+
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
InRecovery = true;
}
+ /*
+ * setup shared memory state for logical wal receiver
+ *
+ * Do this unconditionally so enabling/disabling/enabling logical replay
+ * doesn't loose information due to rewriting pg_control
+ */
+ {
+ int i;
+
+ Assert(WalRcv);
+ /* locking is not really required here afaics, but ... */
+ SpinLockAcquire(&WalRcv->mutex);
+
+ for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+ i++)
+ {
+ XLogRecPtr* receiveState = &ControlFile->checkPointCopy.logicalReceiveState[i];
+ XLogRecPtr* applyState = &ControlFile->checkPointCopy.logicalApplyState[i];
+ if(i == guc_replication_origin_id && (
+ !XLByteEQ(*receiveState, InvalidXLogRecPtr) ||
+ !XLByteEQ(*applyState, InvalidXLogRecPtr))
+ )
+ {
+ elog(WARNING, "logical recovery state for own db. apply: %X/%X, receive %X/%X, origin %d",
+ applyState->xlogid, applyState->xrecoff,
+ receiveState->xlogid, receiveState->xrecoff,
+ guc_replication_origin_id);
+ WalRcv->mm_receiveState[i] = InvalidXLogRecPtr;
+ WalRcv->mm_applyState[i] = InvalidXLogRecPtr;
+ }
+ else{
+ WalRcv->mm_receiveState[i] = *receiveState;
+ WalRcv->mm_applyState[i] = *applyState;
+ }
+ }
+ SpinLockRelease(&WalRcv->mutex);
+
+ /* FIXME: remove at some point */
+ for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+ i++){
+ elog(LOG, "restored apply state for node %d to %X/%X, receive %X/%X",
+ i,
+ WalRcv->mm_applyState[i].xlogid, WalRcv->mm_applyState[i].xrecoff,
+ WalRcv->mm_receiveState[i].xlogid, WalRcv->mm_receiveState[i].xrecoff);
+ }
+ }
+
/* REDO */
if (InRecovery)
{
&checkPoint.nextMulti,
&checkPoint.nextMultiOffset);
+ /*
+ * fill out where are at wrt logical replay. Do this unconditionally so we
+ * don't loose information due to rewriting pg_control when toggling
+ * logical replay
+ */
+ {
+ int i;
+ SpinLockAcquire(&WalRcv->mutex);
+
+ for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+ i++){
+ checkPoint.logicalApplyState[i] = WalRcv->mm_applyState[i];
+ checkPoint.logicalReceiveState[i] = WalRcv->mm_receiveState[i];
+ }
+ SpinLockRelease(&WalRcv->mutex);
+ elog(LOG, "updated logical checkpoint data");
+ }
+
/*
* Having constructed the checkpoint record, ensure all shmem disk buffers
* and commit-log buffers are flushed to disk.
#include "replication/logical.h"
int guc_replication_origin_id = InvalidMultimasterNodeId;
RepNodeId current_replication_origin_id = InvalidMultimasterNodeId;
+XLogRecPtr current_replication_origin_lsn = {0, 0};
MemSet(WalRcv, 0, WalRcvShmemSize());
WalRcv->walRcvState = WALRCV_STOPPED;
SpinLockInit(&WalRcv->mutex);
+
+ memset(&WalRcv->mm_receiveState,
+ 0, sizeof(WalRcv->mm_receiveState));
+ memset(&WalRcv->mm_applyState,
+ 0, sizeof(WalRcv->mm_applyState));
+
+ memset(&WalRcv->mm_receiveLatch,
+ 0, sizeof(WalRcv->mm_receiveLatch));
}
}
int nmsgs; /* number of shared inval msgs */
Oid dbId; /* MyDatabaseId */
Oid tsId; /* MyDatabaseTableSpace */
+ XLogRecPtr origin_lsn; /* location of originating commit */
/* Array of RelFileNode(s) to drop at commit */
RelFileNode xnodes[1]; /* VARIABLE LENGTH ARRAY */
/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
#define PG_CONTROL_H
#include "access/xlogdefs.h"
+#include "replication/logical.h"
#include "pgtime.h" /* for pg_time_t */
#include "utils/pg_crc.h"
/* Version identifier for this pg_control format */
-#define PG_CONTROL_VERSION 922
+#define PG_CONTROL_VERSION 923
/*
* Body of CheckPoint XLOG records. This is declared here because we keep
* it's set to InvalidTransactionId.
*/
TransactionId oldestActiveXid;
+
+ /*
+ * The replay state from every other node. This is only needed if wal_level
+ * >= logical and thus is only filled then.
+ */
+ XLogRecPtr logicalApplyState[MaxMultimasterNodeId - 1];
+ XLogRecPtr logicalReceiveState[MaxMultimasterNodeId - 1];
} CheckPoint;
/* XLOG info values for XLOG rmgr */
* NOTE: try to keep this under 512 bytes so that it will fit on one physical
* sector of typical disk drives. This reduces the odds of corruption due to
* power failure midway through a write.
+ *
+ * FIXME: in order to allow many nodes in mm (which increases checkpoint size)
+ * we should change the writing of this to write(temp_file);fsync();rename();fsync();
*/
typedef struct ControlFileData
extern int guc_replication_origin_id;
extern RepNodeId current_replication_origin_id;
+extern XLogRecPtr current_replication_origin_lsn;
#define InvalidMultimasterNodeId 0
#define MaxMultimasterNodeId (2<<3)
#include "access/xlog.h"
#include "access/xlogdefs.h"
+#include "replication/logical.h"
+#include "storage/latch.h"
#include "storage/spin.h"
#include "pgtime.h"
char conninfo[MAXCONNINFO];
slock_t mutex; /* locks shared variables shown above */
+
+ /*
+ * replay point up to which we replayed for every node
+ * XXX: should possibly be dynamically sized?
+ * FIXME: should go to its own shm segment?
+ */
+ XLogRecPtr mm_receiveState[MaxMultimasterNodeId - 1];
+ XLogRecPtr mm_applyState[MaxMultimasterNodeId - 1];
+
+ Latch* mm_receiveLatch[MaxMultimasterNodeId - 1];
+
} WalRcvData;
extern WalRcvData *WalRcv;