RabbitMQ
Naming conventions
This concept extends MsgSpec-06 Messaging Infrastructure.
| Type | Pattern | Example |
|---|---|---|
Exchange |
Exchanges decribed as domain models x.{SCS_NAME}.\{DOMAIN MODEL}[OPTIONAL .\{SUB_DOMAIN_MODEL 1}.\{SUB_DOMAIN_MODEL N}] |
1.x.cm.opp.v0.message.measurement 2.x.cm.ruleResult.positive |
Queue |
Queues described as actions (domain model from exchange + operation from routing key) q.{SCS_NAME}.{SERVICE_NAME}.\{DOMAIN MODEL}[OPTIONAL .\{SUB_DOMAIN_MODEL 1}.\{SUB_DOMAIN_MODEL N}] + \{ROUTING KEY} (!) |
Exchange: x.tm.ticket
Exchange: x.cpm.ppmp.machineMsg.original
Exchange: x.cpm.ppmp.machineMsg.preprocessed
|
Routing key |
Routing key described as operations that are always relating to last element of domain model from exchange pattern \{EVENT ACTION (verb as past tense)} |
Exchange: tm.ticket
Exchange: cpm.ppmp.machineMsg.original
Exchange: cpm.ppmp.machineMsg.preprocessed
|
Inter-SCS communication
Business events:
As the Business Event Communication is not yet defined, we need a quick and probably temporary solution in order to realize asynchronous inter-system communication using business events (complete entities and not only IDs from the Integration Events) . The following modules need to communicate via RabbitMQ with each other:
Incoming Message Communication
| Publisher | Exchange | Type | Routing Key | Payload | Consumer | Queue Name | Description |
|---|---|---|---|---|---|---|---|
OPP-Publisher |
x.connectivity.opp.v09.machineEquipment.measurementTimeSeries.v09 |
topic |
received |
JSON Array - batch |
Condition-Monitoring-Core |
q.cm.core.opp.v09.machineEquipment.measurementTimeSeries.v09 |
receive OPP measurement messages |
OPP-Publisher |
x.connectivity.opp.v09.machineEquipment.machine.v09 |
topic |
received |
JSON Array - batch |
Condition-Monitoring-Core |
q.cm.core.opp.v09.machineEquipment.machine.v09 |
receive OPP machine messages |
Rule-Service-App |
x.cm.ruleResult.positive |
topic |
created |
JSON Array - batch |
Condition-Monitoring-Core |
q.cm.core.ruleResult.positive |
Receiving rule results to save as event in Influx and send deviations if activated to Deviation Processor |
Rule-Service-App |
x.cm.ppmp.v2.message.measurement |
topic |
created |
JSON Array - batch |
Condition-Monitoring-Core |
q.cm.core.ppmp.v2.message.measurement |
Receiving measurement message (for a specific use case) to save in influx |
Condition-Monitoring-Core |
x.cm.ppmp.v2internal.message.measurement |
topic |
enriched |
JSON Array - batch |
Rule-Service-App |
q.cm.rs.ppmp.measurement.enriched |
Receiving PPMPv2 enriched measurement message as an input for rule executions |
Condition-Monitoring-Core |
x.cm.ppmp.v2internal.message.machine |
topic |
enriched |
JSON Array - batch |
Rule-Service-App |
q.cm.rs.ppmp.machineMsg.enriched |
Receiving PPMPv2 enriched machine message as an input for rule executions |
Condition-Monitoring-Core |
x.cm.sdm.request |
topic |
startAnalysis |
JSON |
Sequence-Detection-Module |
q.ai.core.sequenceDetectionRequest |
Requests in order to start analysis. |
Sequence-Detection-Module |
x.ai.sdm.result |
topic |
analysisResults |
JSON |
Condition-Monitoring-Core |
q.cm.core.analysisResults |
Replies with analysis results. |
Outgoing Message Communication
Business events will be communicated by binding consumer queues directly to exchanges of other SCS. In order to able to bind the queue to exchanges of other SCS, related permissions should be given to the user as seen below:
CM User RabbitMQ Configuration:
Virtual host |
Configure regexp |
Write regexp |
Read regexp |
/ |
^(q|x){1}\..(tm|rm|cm)\..+$ |
^(q|x){1}\.(tm|rm|cm|nexeed)\..+$ |
^(q|x){1}\.(tm|rm|cm|nexeed)\..+$ |
Messaging view for automatic ticket creation can be found here: Automatic Ticket Creation
Integration event
Incoming Message Communication
| Publisher | Exchange | Type | Routing Key | Payload | Consumer | Exchange Name | Queue Name |
|---|---|---|---|---|---|---|---|
MDM |
x.nexeed.integration |
fanout |
# |
JSON Object (Integration Event) |
(Rules Management) Rule-Service-App |
x.cm.integration |
q.cm.rs.facility.created q.cm.rs.facility.modified q.cm.rs.facility.removed q.cm.rs.device.created q.cm.rs.device.modified q.cm.rs.device.removed q.cm.rs.deviceType.created q.cm.rs.deviceType.modified q.cm.rs.deviceType.removed q.cm.rs.measuringPoint.modified q.cm.rs.measuringPoint.removed q.cm.rs.relation.created q.cm.rs.relation.removed |
MDM |
x.nexeed.integration |
fanout |
# |
JSON Object (Integration Event) |
(Condition Monitoring) Condition-Monitoring-Core |
x.cm.integration (Exchange to exchange binding as defined MsgSpec-06 Messaging Infrastructure) |
q.cm.core.facility.created q.cm.core.facility.modified q.cm.core.facility.removed q.cm.core.device.created q.cm.core.device.modified q.cm.core.device.removed q.cm.core.relation.created q.cm.core.relation.removed q.cm.core.deviceType.created q.cm.core.deviceType.modified q.cm.core.deviceType.removed q.cm.core.measuringPoint.modified q.cm.core.measuringPoint.removed |
Messaging view for MDM integration event can be found here: MDM Integration Events and devices, facilities, device types, measuring points and relations integration events is specified in the Messaging Concept - MDM
Inner SCS communication
The communication between services of Condition Monitoring Core are listed here:
| Publisher | Exchange | Type | Routing Key | API Documentation | Consumer | Queue | Queue Description |
|---|---|---|---|---|---|---|---|
Condition-Monitoring-Core |
x.cm.device |
topic |
updateCache |
JSON |
Condition-Monitoring-Core |
q.cm.core.device.updateCache-${random.uuid} |
Updates device cache |
Condition-Monitoring-Core |
x.cm.deviceType |
topic |
updateCache |
JSON |
Condition-Monitoring-Core |
q.cm.core.deviceType.updateCache-${random.uuid} |
Updates device type cache |
Condition-Monitoring-Core |
x.cm.facility |
topic |
updateCache |
JSON |
Condition-Monitoring-Core |
q.cm.core.facility.updateCache-${random.uuid} |
Updates facility cache |
Condition-Monitoring-Core |
x.cm.tenant |
topic |
updateCache |
JSON |
Condition-Monitoring-Core |
q.cm.core.tenant.updateCache-${random.uuid} |
Updates tenant cache |
Rule-Service-App |
x.cm.rm.facility |
topic |
updateCache |
JSON |
Rule-Service-App |
q.cm.rs.facility.updateCache-${random.uuid} |
Updates facility cache |
Rule-Service-App |
x.cm.rule.created |
topic |
# |
JSON |
Rule-Service-App |
q.cm.rs.rule.created-${random.uuid} |
Add rule to cache |
Rule-Service-App |
x.cm.rule.updated |
topic |
# |
JSON |
Rule-Service-App |
q.cm.rs.rule.updated-${random.uuid} |
Update rule in cache |
Rule-Service-App |
x.cm.rule.deleted |
topic |
# |
JSON |
Rule-Service-App |
q.cm.rs.rule.deleted-${random.uuid} |
Delete rule from cache |
Rule-Service-App |
x.cm.measurementRuleExecutionMessage |
topic |
created |
JSON Array - batch |
Stateful-function-Executor |
q.cm.sfe.measurementRuleExecutionMsg.received |
Receiving measuremenExecutionMessage (measurement message + related rules) |
Rule-Service-App |
x.cm.machineRuleExecutionMessage |
topic |
created |
JSON Array - batch |
Stateful-function-Executor |
q.cm.sfe.machineRuleExecutionMsg.received |
Receiving MachineExecutionMessage (machine message + related rules) |
Rule-Service-App |
x.cm.rm.tenant |
topic |
updateCache |
JSON |
Rule-Service-App |
q.cm.rs.tenant.updateCache-${random.uuid} |
Updates tenant cache |