Kafka topics
Pattern
Topics will be generally described by their domain and follow this pattern:
{NAMESPACE}bci.cpm.{DOMAIN}[OPTIONAL.\{SUB_DOMAIN 1}.\{SUB_DOMAIN N}][OPTIONAL.{internal}|{tenant-ID}]
The namespace can be configured by the environment variable rm_kafka_topic_prefix (see also Kafka Topic Service-Configuration)
Topics are usually created manually by the Kafka administrator (request via IT-Service Portal for BD Kafka environments). The topic configurations are listed in the table below. Topics configuration details can be found: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#
However, if the environment variable rm_kafka_topic_auto_creation is set to true, the Rules Management applications will automatically create the topics at startup.
Topic list
| Rules Management Topic Name | Configuration Recommendations | Description | Publisher | Consumer Group Name |
|---|---|---|---|---|
rm_kafka_topic_prefixbci.cpm.valueProvider.measurementMessage |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.valueProvider.machineMessage |
|
PPMP machine messages with rules for storing previous values and enriching rules with requested data (previous values) |
|
|
rm_kafka_topic_prefixbci.cpm.functionExecution.measurementMessage.internal |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.functionExecution.machineMessage.internal |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.functionExecution.measurementMessage.[TENANT_ID] |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.functionExecution.machineMessage.[TENANT_ID] |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.ruleResult.internal |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.ruleResult.[TENANT_ID] |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.aggregatedRuleResult.positive |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.aggregatedRuleResult.changes |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.aggregatedMeasurements.1m |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.aggregatedMeasurements.30m |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.aggregatedMeasurements.12h |
|
|
|
|
Internal topics (state stores)
| Owner | Configuration Recommendations | Name | Description |
|---|---|---|---|
Rule Result Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleResultAggregator-rm_kafka_topic_prefixbci.cpm.stateStore.rra.ruleResult-changelog |
|
Rule Value Provider |
|
rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.rawMeasurements-changelog |
|
Rule Value Provider |
|
rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.1MinMeasurements-changelog |
|
Rule Value Provider |
|
rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.30MinMeasurements-changelog |
|
Rule Value Provider |
|
rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.12HoursMeasurements-changelog |
|
Rule Value Provider |
|
rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.rawMachineMessages-changelog |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.1MinMeasurements-changelog |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.30MinMeasurements-changelog |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.12HoursMeasurements-changelog |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.1MinMeasurements-repartition |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.30MinMeasurements-repartition |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.12HoursMeasurements-repartition |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-1m_aggregation_window_closed-store-changelog |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-30m_aggregation_window_closed-store-changelog |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-12h_aggregation_window_closed-store-changelog |
|
Example Kafka topic list with minimum configuration
Here is the example kafka topics with minimum configuration for an environment with RM_KAFKA_TOPIC_PREFIX="bci-prep15.", you just need to replace the prefix with your own hostname prefix:
| Topic | Cleanup policy | Replication Factor | Retention Time (days) | Retention Size (GB) | Restricted Access | Compression | Serialization Format | Partitions Count |
|---|---|---|---|---|---|---|---|---|
bci-prep15.bci.cpm.aggregatedMeasurements.1m |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.aggregatedMeasurements.12h |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.aggregatedMeasurements.30m |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.aggregatedRuleResult.changes |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.aggregatedRuleResult.positive |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.functionExecution.machineMessage.<TENANT_ID> |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.functionExecution.measurementMessage.<TENANT_ID> |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.functionExecution.machineMessage.internal |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.functionExecution.measurementMessage.internal |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleResult.<TENANT_ID> |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleResult.internal |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.valueProvider.machineMessage |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.valueProvider.measurementMessage |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleResultAggregator-bci-prep15.bci.cpm.stateStore.rra.ruleResult-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueAggregator-1m_aggregation_window_closed-store-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueAggregator-12h_aggregation_window_closed-store-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueAggregator-30m_aggregation_window_closed-store-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.1MinMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.1MinMeasurements-repartition |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.12HoursMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.12HoursMeasurements-repartition |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.30MinMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.30MinMeasurements-repartition |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.1MinMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.12HoursMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.30MinMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.rawMachineMessages-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.rawMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
Authentication and authorization
In order to protect topic data from misuse the access to Kafka topics needs to be restricted. Therefore a separate "Rules Management"-Kafka-user should be created (see Kafka Authentication (SSL)) and following permissions needs to be assigned via Kafka Topic ACL:
| Topic Name | Permissions "Rules Management"-Kafka-user | kafka-acls.sh option |
|---|---|---|
bci.cpm.* |
READ + WRITE |
"-producer" + "--consumer" |
This means that the Kafka user has read and write access to all topics starting with "bci.cpm.".
(warning) See also Kafka documentation how to secure topics through prefixed ACLs (--resource-pattern-type prefixed).
ACL: Kafka topics for external functions
When integrating external functions within Rules Management a separate Kafka user should be created for every single customer (tenant). This user has only access to certain topics that are needed for the integration. In this example we are using TENANT_ID of "A"-customer:
| Topic Name | Permissions tenant-specific-kafka-user | kafka-acls.sh options |
|---|---|---|
bci.cpm.functionExecution.measurementMessage.TENANT_ID_A |
READ |
"--consumer" |
bci.cpm.functionExecution.machineMessage.TENANT_ID_A |
READ |
"--consumer" |
bci.cpm.ruleResult.TENANT_ID_A |
READ + WRITE |
"–producer" |
Kafka topic service-configuration
The described topics can be created automatically by the Rules Management applications itself. Therefore the following environment variables are provided:
| Environment Variable | Type | Default | Description |
|---|---|---|---|
RM_KAFKA_TOPIC_PREFIX |
String |
In order to run multiple Rules Management (e.g. DEV, PROD, TEST) a prefix (namespace) can be used e.g. RM_KAFKA_TOPIC_PREFIX=DEV- bci.cpm.valueProvider.measurementMessage → DEV-bci.cpm.valueProvider.measurementMessage |
|
RM_KAFKA_TOPIC_AUTO_CREATION |
boolean |
false |
Flag, if the required topics should be created during application start up. |
RM_KAFKA_TOPIC_PARTITIONS |
integer |
50 |
Number of partitions for all topics. Only when RM_KAFKA_TOPIC_AUTO_CREATION=true |
RM_KAFKA_TOPIC_REPLICAS |
integer |
1 |
Number of replicas for all topics. Only when RM_KAFKA_TOPIC_AUTO_CREATION=true |