Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
632 changes: 593 additions & 39 deletions src/backend/replication/logical/applyparallelworker.c

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions src/backend/replication/logical/proto.c
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,44 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
logicalrep_write_attrs(out, rel, columns, include_gencols_type);
}

/*
* Write internal relation description to the output stream.
*/
void
logicalrep_write_internal_rel(StringInfo out, LogicalRepRelation *rel)
{
pq_sendint32(out, rel->remoteid);

/* Write relation name */
pq_sendstring(out, rel->nspname);
pq_sendstring(out, rel->relname);

/* Write the replica identity. */
pq_sendbyte(out, rel->replident);

/* Write attribute description */
pq_sendint16(out, rel->natts);

for (int i = 0; i < rel->natts; i++)
{
uint8 flags = 0;

if (bms_is_member(i, rel->attkeys))
flags |= LOGICALREP_IS_REPLICA_IDENTITY;

pq_sendbyte(out, flags);

/* attribute name */
pq_sendstring(out, rel->attnames[i]);

/* attribute type id */
pq_sendint32(out, rel->atttyps[i]);

/* ignore attribute mode for now */
pq_sendint32(out, 0);
}
}

/*
* Read the relation info from stream and return as LogicalRepRelation.
*/
Expand Down Expand Up @@ -1250,6 +1288,10 @@ logicalrep_message_type(LogicalRepMsgType action)
return "STREAM ABORT";
case LOGICAL_REP_MSG_STREAM_PREPARE:
return "STREAM PREPARE";
case LOGICAL_REP_MSG_INTERNAL_DEPENDENCY:
return "INTERNAL DEPENDENCY";
case LOGICAL_REP_MSG_INTERNAL_RELATION:
return "INTERNAL RELATION";
}

/*
Expand Down
55 changes: 55 additions & 0 deletions src/backend/replication/logical/relation.c
Original file line number Diff line number Diff line change
Expand Up @@ -946,3 +946,58 @@ FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,

return InvalidOid;
}

/*
* Get the number of entries in the LogicalRepRelMap.
*/
int
logicalrep_get_num_rels(void)
{
if (LogicalRepRelMap == NULL)
return 0;

return hash_get_num_entries(LogicalRepRelMap);
}

/*
* Write all the remote relation information from the LogicalRepRelMapEntry to
* the output stream.
*/
void
logicalrep_write_all_rels(StringInfo out)
{
LogicalRepRelMapEntry *entry;
HASH_SEQ_STATUS status;

if (LogicalRepRelMap == NULL)
return;

hash_seq_init(&status, LogicalRepRelMap);

while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
logicalrep_write_internal_rel(out, &entry->remoterel);
}

/*
* Get the LogicalRepRelMapEntry corresponding to the given relid without
* opening the local relation.
*/
LogicalRepRelMapEntry *
logicalrep_get_relentry(LogicalRepRelId remoteid)
{
LogicalRepRelMapEntry *entry;
bool found;

if (LogicalRepRelMap == NULL)
logicalrep_relmap_init();

/* Search for existing entry. */
entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
HASH_FIND, &found);

if (!found)
elog(DEBUG1, "no relation map entry for remote relation ID %u",
remoteid);

return entry;
}
Loading