#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "pgstat.h"
-#include "replication/walsender.h"
+#include "replication/logical.h"
#include "replication/syncrep.h"
+#include "replication/walsender.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
* actions for which the order of execution is critical.
*/
static void
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
- TransactionId *sub_xids, int nsubxacts,
- SharedInvalidationMessage *inval_msgs, int nmsgs,
- RelFileNode *xnodes, int nrels,
- Oid dbId, Oid tsId,
- uint32 xinfo)
+xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node,
+ XLogRecPtr lsn, TransactionId *sub_xids,
+ int nsubxacts, SharedInvalidationMessage *inval_msgs,
+ int nmsgs, RelFileNode *xnodes, int nrels,
+ Oid dbId, Oid tsId, uint32 xinfo)
{
TransactionId max_xid;
int i;
* Utility function to call xact_redo_commit_internal after breaking down xlrec
*/
static void
-xact_redo_commit(xl_xact_commit *xlrec,
- TransactionId xid, XLogRecPtr lsn)
+xact_redo_commit(xl_xact_commit *xlrec, RepNodeId originating_node,
+ TransactionId xid, XLogRecPtr lsn)
{
TransactionId *subxacts;
SharedInvalidationMessage *inval_msgs;
/* invalidation messages array follows subxids */
inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
- xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
- inval_msgs, xlrec->nmsgs,
- xlrec->xnodes, xlrec->nrels,
- xlrec->dbId,
- xlrec->tsId,
- xlrec->xinfo);
+ xact_redo_commit_internal(xid, originating_node, lsn, subxacts,
+ xlrec->nsubxacts, inval_msgs, xlrec->nmsgs,
+ xlrec->xnodes, xlrec->nrels,
+ xlrec->dbId, xlrec->tsId, xlrec->xinfo);
}
/*
* Utility function to call xact_redo_commit_internal for compact form of message.
*/
static void
-xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
- TransactionId xid, XLogRecPtr lsn)
+xact_redo_commit_compact(xl_xact_commit_compact *xlrec, RepNodeId originating_node,
+ TransactionId xid, XLogRecPtr lsn)
{
- xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts,
- NULL, 0, /* inval msgs */
- NULL, 0, /* relfilenodes */
- InvalidOid, /* dbId */
- InvalidOid, /* tsId */
- 0); /* xinfo */
+ xact_redo_commit_internal(xid, originating_node, lsn, xlrec->subxacts,
+ xlrec->nsubxacts,
+ NULL, 0, /* inval msgs */
+ NULL, 0, /* relfilenodes */
+ InvalidOid, /* dbId */
+ InvalidOid, /* tsId */
+ 0); /* xinfo */
}
/*
/* Backup blocks are not used in xact records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ /* FIXME: we probably shouldn't pass xl_origin_id at multiple places, hm */
if (info == XLOG_XACT_COMMIT_COMPACT)
{
xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
- xact_redo_commit_compact(xlrec, record->xl_xid, lsn);
+ xact_redo_commit_compact(xlrec, record->xl_origin_id, record->xl_xid, lsn);
}
else if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
- xact_redo_commit(xlrec, record->xl_xid, lsn);
+ xact_redo_commit(xlrec, record->xl_origin_id, record->xl_xid, lsn);
}
else if (info == XLOG_XACT_ABORT)
{
{
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
- xact_redo_commit(&xlrec->crec, xlrec->xid, lsn);
+ xact_redo_commit(&xlrec->crec, record->xl_origin_id, xlrec->xid, lsn);
RemoveTwoPhaseFile(xlrec->xid, false);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/logical.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
record->xl_len = len; /* doesn't include backup blocks */
record->xl_info = info;
record->xl_rmid = rmid;
-
+ record->xl_origin_id = current_replication_origin_id;
/* Now we can finish computing the record's CRC */
COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
SizeOfXLogRecord - sizeof(pg_crc32));
record->xl_len = sizeof(checkPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
record->xl_rmid = RM_XLOG_ID;
+ record->xl_origin_id = InvalidMultimasterNodeId;
memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
INIT_CRC32(crc);
#include "access/xlogreader.h"
/* FIXME */
+#include "replication/logical.h" /* InvalidMultimasterNodeId */
#include "replication/walsender_private.h"
#include "replication/walprotocol.h"
spacer.xl_len = temp_record->xl_tot_len - SizeOfXLogRecord;
spacer.xl_rmid = RM_XLOG_ID;
spacer.xl_info = XLOG_NOOP;
+ spacer.xl_origin_id = InvalidMultimasterNodeId;
state->writeout_data(state,
(char*)&spacer,
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = applycache.o decode.o
+OBJS = applycache.o decode.o logical.o
include $(top_srcdir)/src/backend/common.mk
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * logical.c
+ *
+ * Support functions for logical/multimaster replication
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical.c
+ *
+ */
+#include "postgres.h"
+#include "replication/logical.h"
+int guc_replication_origin_id = InvalidMultimasterNodeId;
+RepNodeId current_replication_origin_id = InvalidMultimasterNodeId;
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/logical.h"
#include "storage/bufmgr.h"
#include "storage/standby.h"
#include "storage/fd.h"
static const char *show_tcp_keepalives_count(void);
static bool check_maxconnections(int *newval, void **extra, GucSource source);
static void assign_maxconnections(int newval, void *extra);
+static void assign_replication_node_id(int newval, void *extra);
static bool check_maxworkers(int *newval, void **extra, GucSource source);
static void assign_maxworkers(int newval, void *extra);
static bool check_autovacuum_max_workers(int *newval, void **extra, GucSource source);
NULL, NULL, NULL
},
+ {
+ {"multimaster_node_id", PGC_POSTMASTER, REPLICATION_MASTER,
+ gettext_noop("node id for multimaster."),
+ NULL
+ },
+ &guc_replication_origin_id,
+ InvalidMultimasterNodeId, InvalidMultimasterNodeId, MaxMultimasterNodeId,
+ NULL, assign_replication_node_id, NULL
+ },
+
{
{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the maximum number of concurrent connections."),
MaxBackends = newval + MaxWorkers + autovacuum_max_workers + 1;
}
+static void
+assign_replication_node_id(int newval, void *extra)
+{
+ guc_replication_origin_id = newval;
+ current_replication_origin_id = newval;
+}
+
static bool
check_maxworkers(int *newval, void **extra, GucSource source)
{
#hot_standby_feedback = off # send info from standby to prevent
# query conflicts
+# - Multi Master Servers -
+
+#multimaster_node_id = 0 #invalid node id
#------------------------------------------------------------------------------
# QUERY TUNING
#include "access/xlog_internal.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
-
+#include "replication/logical.h"
extern int optind;
extern char *optarg;
record->xl_len = sizeof(CheckPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
record->xl_rmid = RM_XLOG_ID;
+ record->xl_origin_id = InvalidMultimasterNodeId;
memcpy(XLogRecGetData(record), &ControlFile.checkPointCopy,
sizeof(CheckPoint));
uint32 xl_len; /* total len of rmgr data */
uint8 xl_info; /* flag bits, see below */
RmgrId xl_rmid; /* resource manager for this record */
-
- /* Depending on MAXALIGN, there are either 2 or 6 wasted bytes here */
+ RepNodeId xl_origin_id; /* what node did originally cause this record to be written */
+ /* Depending on MAXALIGN, there are either 0 or 4 wasted bytes here */
/* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
*/
typedef uint32 TimeLineID;
+typedef uint16 RepNodeId;
+
/*
* Because O_DIRECT bypasses the kernel buffers, and because we never
* read those buffers except during crash recovery or if wal_level != minimal,
--- /dev/null
+/*
+ * logical.h
+ *
+ * PostgreSQL logical replication support
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/replication/logical.h
+ */
+#ifndef LOGICAL_H
+#define LOGICAL_H
+
+#include "access/xlogdefs.h"
+
+extern int guc_replication_origin_id;
+extern RepNodeId current_replication_origin_id;
+
+#define InvalidMultimasterNodeId 0
+#define MaxMultimasterNodeId (2<<3)
+#endif