Kafka topics
Pattern
Topics will be generally described by their domain and follow this pattern:
{NAMESPACE}bci.cpm.{DOMAIN}[OPTIONAL.\{SUB_DOMAIN 1}.\{SUB_DOMAIN N}][OPTIONAL.{internal}|{tenant-ID}]
The namespace can be configured by the environment variable rm_kafka_topic_prefix (see also Kafka Topic Service-Configuration)
Topics are usually created manually by the Kafka administrator (request via IT-Service Portal for BD Kafka environments). The topic configurations are listed in the table below. Topics configuration details can be found: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#
However, if the environment variable rm_kafka_topic_auto_creation is set to true, the Rules Management applications will automatically create the topics at startup.
Topic list
| Rules Management Topic Name | Configuration Recommendations | Description | Publisher | Consumer Group Name |
|---|---|---|---|---|
rm_kafka_topic_prefixbci.cpm.valueProvider.measurementMessage |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.valueProvider.machineMessage |
|
PPMP machine messages with rules for storing previous values and enriching rules with requested data (previous values) |
|
|
rm_kafka_topic_prefixbci.cpm.functionExecution.measurementMessage.internal |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.functionExecution.machineMessage.internal |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.functionExecution.measurementMessage.[TENANT_ID] |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.functionExecution.machineMessage.[TENANT_ID] |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.ruleResult.internal |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.ruleResult.[TENANT_ID] |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.aggregatedRuleResult.positive |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.aggregatedRuleResult.changes |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.aggregatedMeasurements.1m |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.aggregatedMeasurements.30m |
|
|
|
|
rm_kafka_topic_prefixbci.cpm.aggregatedMeasurements.12h |
|
|
|
|
Internal topics (state stores)
| Owner | Configuration Recommendations | Name | Description |
|---|---|---|---|
Rule Result Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleResultAggregator-rm_kafka_topic_prefixbci.cpm.stateStore.rra.ruleResult-changelog |
|
Rule Value Provider |
|
rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.rawMeasurements-changelog |
|
Rule Value Provider |
|
rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.1MinMeasurements-changelog |
|
Rule Value Provider |
|
rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.30MinMeasurements-changelog |
|
Rule Value Provider |
|
rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.12HoursMeasurements-changelog |
|
Rule Value Provider |
|
rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.rawMachineMessages-changelog |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.1MinMeasurements-changelog |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.30MinMeasurements-changelog |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.12HoursMeasurements-changelog |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.1MinMeasurements-repartition |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.30MinMeasurements-repartition |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.12HoursMeasurements-repartition |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-1m_aggregation_window_closed-store-changelog |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-30m_aggregation_window_closed-store-changelog |
|
Rule Value Aggregator |
|
rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-12h_aggregation_window_closed-store-changelog |
|
Example Kafka topic list with minimum configuration
Here is the example kafka topics with minimum configuration for an environment with RM_KAFKA_TOPIC_PREFIX="bci-prep15.", you just need to replace the prefix with your own hostname prefix:
| Topic | Cleanup policy | Replication Factor | Retention Time (days) | Retention Size (GB) | Restricted Access | Compression | Serialization Format | Partitions Count | Message timestamp Type | Access & Authorization |
|---|---|---|---|---|---|---|---|---|---|---|
bci-prep15.bci.cpm.aggregatedMeasurements.1m |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.aggregatedMeasurements.12h |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.aggregatedMeasurements.30m |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.aggregatedRuleResult.changes |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.aggregatedRuleResult.positive |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.functionExecution.machineMessage.<TENANT_ID> |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.functionExecution.measurementMessage.<TENANT_ID> |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.functionExecution.machineMessage.internal |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.functionExecution.measurementMessage.internal |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleResult.<TENANT_ID> |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleResult.internal |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.valueProvider.machineMessage |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.valueProvider.measurementMessage |
delete |
3 |
1 |
8.0 |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleResultAggregator-bci-prep15.bci.cpm.stateStore.rra.ruleResult-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueAggregator-1m_aggregation_window_closed-store-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueAggregator-12h_aggregation_window_closed-store-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueAggregator-30m_aggregation_window_closed-store-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.1MinMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.1MinMeasurements-repartition |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.12HoursMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.12HoursMeasurements-repartition |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.30MinMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.30MinMeasurements-repartition |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.1MinMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.12HoursMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.30MinMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.rawMachineMessages-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.rawMeasurements-changelog |
compact |
3 |
- |
- |
yes |
gzip |
JSON |
5 |
CreateTime |
Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com |
Authentication and authorization
In order to protect topic data from misuse the access to Kafka topics needs to be restricted. Therefore, a separate "Rules Management"-Kafka-user should be created (see Kafka Authentication (SSL)) and following permissions needs to be assigned via Kafka Topic ACL:
| Topic Name | Permissions "Rules Management"-Kafka-user | kafka-acls.sh option |
|---|---|---|
bci.cpm.* |
READ + WRITE |
"-producer" + "--consumer" |
This means that the Kafka user has read and write access to all topics starting with "bci.cpm.".
(warning) See also Kafka documentation how to secure topics through prefixed ACLs (--resource-pattern-type prefixed).
ACL: Kafka topics for external functions
When integrating external functions within Rules Management a separate Kafka user should be created for every single customer (tenant). This user has only access to certain topics that are needed for the integration. In this example we are using TENANT_ID of "A"-customer:
| Topic Name | Permissions tenant-specific-kafka-user | kafka-acls.sh options |
|---|---|---|
bci.cpm.functionExecution.measurementMessage.TENANT_ID_A |
READ |
"--consumer" |
bci.cpm.functionExecution.machineMessage.TENANT_ID_A |
READ |
"--consumer" |
bci.cpm.ruleResult.TENANT_ID_A |
READ + WRITE |
"–producer" |
Kafka topic service-configuration
The described topics can be created automatically by the Rules Management applications itself. Therefore the following environment variables are provided:
| Environment Variable | Type | Default | Description |
|---|---|---|---|
RM_KAFKA_TOPIC_PREFIX |
String |
In order to run multiple Rules Management (e.g. DEV, PROD, TEST) a prefix (namespace) can be used e.g. RM_KAFKA_TOPIC_PREFIX=DEV- bci.cpm.valueProvider.measurementMessage → DEV-bci.cpm.valueProvider.measurementMessage |
|
RM_KAFKA_TOPIC_AUTO_CREATION |
boolean |
false |
Flag, if the required topics should be created during application start up. |
RM_KAFKA_TOPIC_PARTITIONS |
integer |
50 |
Number of partitions for all topics. Only when RM_KAFKA_TOPIC_AUTO_CREATION=true |
RM_KAFKA_TOPIC_REPLICAS |
integer |
1 |
Number of replicas for all topics. Only when RM_KAFKA_TOPIC_AUTO_CREATION=true |
How To: Create Kafka Topics and Configure for On Prem BD Environments
Here is step-by-step guide for creating kafka topics and configuring for On Prem Environment:
Step 1: Requesting Kafka Topic Creation
All Kafka topic creation, modification, or deletion requests must go through the IT Service Portal.
Access the IT Service Portal
-
Navigate to the IT Service Portal.
-
Search for and select the service: Apache Kafka Topics (Create/Change/Delete).
Fill out the Request Form
-
Refer to the example Kafka topic list in your operations manual for guidance.
-
Crucially, you need to change the prefix of the topics according to your server hostname. This ensures topic uniqueness and proper organization.
Cleanup Policy Considerations
-
For topics requiring a
cleanup.policyofdelete(the default for most log-like topics), create one separate request for all such topics. -
For topics requiring a
cleanup.policyofcompact(for key-value store-like topics where only the latest value for a key is retained), create another separate request for all such topics.
For more details about the Kafka topics creation/change/delete process please refer to: Kafka Topic Ordering - Data Streaming Platform
Step 2: Configuring Your K3s/Radium Environment
Once your Kafka topics are approved and created, you need to configure your application environment to connect to and use them. This is done by modifying your custom-values.yml file.
Add Local Kafka Configuration
Under the cm: section, add the following local configuration:
cm:
local:
kafkaClientSSLKeyPassword: "<pwd>"
kafkaClientSSLKeystorePassword: "<pwd>"
kafkaTopicPrefix: "bci-prep15."
kafkaTopicPartitions: "5"
kafkaTopicAutoCreation: false
-
kafkaClientSSLKeyPasswordandkafkaClientSSLKeystorePassword: Replace "<pwd>" with the actual passwords for your Kafka client SSL key and keystore. -
kafkaTopicPrefix: This is critically important. Ensure this prefix exactly matches the prefix you used when requesting topic creation through the IT Service Portal. For example, if your server hostname is bci-prep16, your topic prefix would bebci-prep16.. -
kafkaTopicPartitions: Specify the desired number of partitions for your topics. This should ideally align with what was requested during topic creation. -
kafkaTopicAutoCreation: false: This setting is crucial. Since you are requesting topics through a formal process, you want to prevent your application from attempting to auto-create topics, which could lead to inconsistencies or errors.
Verify Kafka Cluster Configuration (Existing)
Ensure the custom-values.yml file also contains the correct Kafka cluster configuration under the global and modules sections, as shown in the example:
global:
serverInstances:
externalkafka:
adminPassword: ""
hosts:
- <host1>
- <host2>
- <host3>
port: 9093
tls: true
type: KAFKA
default: false
modules:
cm:
enabled: true
ruleManagementLightEnabled: false
messaging:
kafkarm:
serverInstance: externalkafka