Nexeed
    • Introduction
    • User manual
      • Condition monitoring and its tabs
        • Live
        • Counters
        • Measurements
        • Events
        • Rules
        • View configuration
        • Details
      • Rules management
        • Rule types and standard functions
        • Rule details
      • Function configuration
      • Condition Monitoring widgets
      • Access Management
        • Application Roles
        • Fine-Grained Access Control and Configuration
        • How to Configure Organization Roles
    • Operations manual
      • Overview
      • System architecture and interfaces
        • System components
      • System requirements
        • General notes
        • cm/condition-monitoring-core
        • cm/rule-service-app
        • cm/rule-function-executor
        • cm/rule-result-aggregator
        • cm/rule-value-aggregator
        • cm/rule-value-provider
        • cm/stateful-function-executor
      • Migration from previous versions
        • Migration to 2.1+
        • Migration from CPM 1.5.4 to CM and RM 3.0.x (Nexeed IAS 2023.02.00.xx)
          • CPM to CM relational database migration
          • CPM to RM relational database migration
          • CM Influx database migration
          • Deletion of an old CPM installation
        • Resources mapping from MES to IAS Condition Monitoring
        • Migration to 4.0.0+ (Nexeed IAS 2024.01.00.xx)
        • Migration to 4.3.x (Nexeed IAS 2024.02.01.x)
        • Migration to 4.5.x (Nexeed IAS 2025.01.00.x)
        • Migration to 4.6.x (Nexeed IAS 2025.01.01.x)
        • Migration to 4.8.x (Nexeed IAS 2025.02.00.x)
        • Migration to 4.9.x (Nexeed IAS 2025.02.01.x)
      • Setup and configuration
        • Manual MACMA configuration after setting up a new tenant
        • RabbitMQ
        • Influx configuration
        • Kafka topics
        • Condition Monitoring - Helm Configuration
        • Advanced configuration parameters
          • cm/condition-monitoring-core
            • Common shared variables
            • Portal shared variables
            • MDM shared variables
            • RabbitMQ shared variables
            • OTEL shared variables
          • cm/rule-service-app
            • Rules Management shared variables
            • KAFKA shared variables
          • cm/rule-function-executor
          • cm/rule-result-aggregator
          • cm/rule-value-aggregator
          • cm/rule-value-provider
          • cm/stateful-function-executor
      • Start and shutdown
      • Regular operations
      • Failure handling
        • Rule Management Light Helm installation failing when Kafka is disabled or Kafka is not configured at all
        • User manual injection into Rule Management
        • Infrastructure outages: health verification Endpoints
        • OPP/PPMP are not received in CM
        • Master data (Devices, Facilities, Measuring Points, DeviceTypes) is missing in CM
        • CM is not visible in the portal
        • How to verify if the broker is out of sync
      • Backup and Restore
      • Logging and monitoring
        • General logging characteristics
        • Required monitoring
        • General logging format
        • Request-based logging format
        • Security logging format
        • Lifecycle logging format
        • Module health Endpoints and K8s probes
      • Known limitations
    • API documentation
      • Condition Monitoring HTTP API
      • Rules Management HTTP API
    • Glossary
Condition Monitoring
  • Industrial Application System
  • Core Services
    • Block Management
    • Deviation Processor
    • ID Builder
    • Multitenant Access Control
    • Notification Service
    • Ticket Management
    • Web Portal
  • Shopfloor Management
    • Andon Live
    • Global Production Overview
    • KPI Reporting
    • Operational Routines
    • Shift Book
    • Shopfloor Management Administration
  • Product & Quality
    • Product Setup Management
    • Part Traceability
    • Process Quality
    • Setup Specs
  • Execution
    • Line Control
    • Material Management
    • Order Management
    • Packaging Control
    • Rework Control
  • Intralogistics
    • AGV Control Center
    • Stock Management
    • Transport Management
  • Machine & Equipment
    • Condition Monitoring
    • Device Portal
    • Maintenance Management
    • Tool Management
  • Enterprise & Shopfloor Integration
    • Archiving Bridge
    • Data Publisher
    • Direct Data Link
    • Engineering UI
    • ERP Connectivity
    • Gateway
    • Information Router
    • Master Data Management
    • Orchestrator

Nexeed Learning Portal

  • Condition Monitoring
  • Operations manual
  • Setup and configuration
  • Kafka topics
preview 4.10.0

Kafka topics

Pattern

Topics will be generally described by their domain and follow this pattern:

{NAMESPACE}bci.cpm.{DOMAIN}[OPTIONAL.\{SUB_DOMAIN 1}.\{SUB_DOMAIN N}][OPTIONAL.{internal}|{tenant-ID}]

The namespace can be configured by the environment variable rm_kafka_topic_prefix (see also Kafka Topic Service-Configuration)

Topics are usually created manually by the Kafka administrator (request via IT-Service Portal for BD Kafka environments). The topic configurations are listed in the table below. Topics configuration details can be found: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#

However, if the environment variable rm_kafka_topic_auto_creation is set to true, the Rules Management applications will automatically create the topics at startup.

Topic list

Rules Management Topic Name Configuration Recommendations Description Publisher Consumer Group Name

rm_kafka_topic_prefixbci.cpm.valueProvider.measurementMessage

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • PPMP messages with rules for storing previous values/aggregations and enriching rules with requested data (previous values/aggregations)

  • Rule Service App (Context Router)

  • Rule Value Provider: name=bci.cpm.ruleValueProvider

  • Rule Value Aggregator: name=bci.cpm.ruleValueAggregator

rm_kafka_topic_prefixbci.cpm.valueProvider.machineMessage

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

PPMP machine messages with rules for storing previous values and enriching rules with requested data (previous values)

  • Rule Service App (Context Router)

  • Rule Value Provider: name=bci.cpm.ruleValueProvider

  • Rule Value Aggregator: name=bci.cpm.ruleValueAggregator

rm_kafka_topic_prefixbci.cpm.functionExecution.measurementMessage.internal

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Executing standard functions for rules (internal) only PPMP measurement messages

  • Rule Service App (Context Router)

  • Rule Value Provider

  • Rule Function Executor: name=bci.cpm.ruleFunctionExecutor

rm_kafka_topic_prefixbci.cpm.functionExecution.machineMessage.internal

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Executing standard functions for rules (internal) only PPMP machine messages

  • Rule Service App (Context Router)

  • Rule Value Provider

  • Rule Function Executor: name=rm_kafka_topic_prefixbci.cpm.ruleFunctionExecutor

rm_kafka_topic_prefixbci.cpm.functionExecution.measurementMessage.[TENANT_ID]

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Executing custom functions for rules (external) only PPMP measurement messages. One exclusive topic per customer is planned in the beginning.

  • Rule Service App (Context Router)

  • Rule Value Provider

  • External function consumer by customer (only PPMP measurement messages)

rm_kafka_topic_prefixbci.cpm.functionExecution.machineMessage.[TENANT_ID]

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Executing custom functions for rules (external). One exclusive topic per customer is planned in the beginning.

  • Rule Service App (Context Router)

  • Rule Value Provider

  • External function consumer by customer (only PPMP machine messages)

rm_kafka_topic_prefixbci.cpm.ruleResult.internal

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Aggregating all rule results from standard rules (internal)

  • Rule Function Executor

  • Rule Result Aggregator: name=bci.cpm.ruleResultAggregator

rm_kafka_topic_prefixbci.cpm.ruleResult.[TENANT_ID]

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Aggregating all rule results from custom rules (external)

  • External function by customer

  • Rule Result Aggregator name=bci.cpm.ruleResultAggregator

rm_kafka_topic_prefixbci.cpm.aggregatedRuleResult.positive

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Only positive rules

  • Rule Result Aggregator

  • Rule Service App (Context Router): name=bci.cpm.ruleService

rm_kafka_topic_prefixbci.cpm.aggregatedRuleResult.changes

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Only rule result changes (current state of the rule: rule is triggered or not)

  • Rule Result Aggregator

  • Rule Service App (Context Router): name=bci.cpm.ruleService

rm_kafka_topic_prefixbci.cpm.aggregatedMeasurements.1m

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • 1-minute aggregations for raw measurements (only measuring points where a rule exists)

  • Rule Value Aggregator

  • Rule Value Provider: name=bci.cpm.ruleValueProvider

  • Rule Service App (Context Router): name=bci.cpm.ruleService

rm_kafka_topic_prefixbci.cpm.aggregatedMeasurements.30m

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • 30-minute aggregations for raw measurements (only measuring points where a rule exists)

  • Rule Value Aggregator

  • Rule Value Provider: name=bci.cpm.ruleValueProvider

  • Rule Service App (Context Router): name=bci.cpm.ruleService

rm_kafka_topic_prefixbci.cpm.aggregatedMeasurements.12h

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • 12-hours aggregations for raw measurements (only measuring points where a rule exists)

  • Rule Value Aggregator

  • Rule Value Provider: name=bci.cpm.ruleValueProvider

  • Rule Service App (Context Router): name=bci.cpm.ruleService

Internal topics (state stores)

Owner Configuration Recommendations Name Description

Rule Result Aggregator

  • Key/Value-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default) # Compaction runs when 50% of the log is dirty, balancing performance and resource usage

  • min.compaction.lag.ms=0 (default) # Allows immediate compaction after a message is written.

  • max.compaction.lag.ms=9223372036854775807 (default) # No upper limit, so compaction is not forced by time.

  • delete.retention.ms=86400000 (default) # 24 hours #ombstone (delete marker) records are retained for 24 hours before being purged, allowing consumers time to process deletions.

rm_kafka_topic_prefixbci.cpm.ruleResultAggregator-rm_kafka_topic_prefixbci.cpm.stateStore.rra.ruleResult-changelog

  • Latest rule result per device and rule

Rule Value Provider

  • Key/Value-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.rawMeasurements-changelog

  • Raw measurements from PPMP measurements messages (max size per measuring point with default 50)

Rule Value Provider

  • Key/Value-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.1MinMeasurements-changelog

  • 1 minute aggregation of measurements from PPMP measurements messages

Rule Value Provider

  • Key/Value-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.30MinMeasurements-changelog

  • 30 minute aggregation of measurements from PPMP measurements messages

Rule Value Provider

  • Key/Value-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.12HoursMeasurements-changelog

  • 12 hour aggregation of measurements from PPMP measurements messages

Rule Value Provider

  • Key/Value-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.rawMachineMessages-changelog

  • Codes from PPMP machine messages (max size per device with default 50)

Rule Value Aggregator

  • Window-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.1MinMeasurements-changelog

  • Needed for buffering all values BEFORE 1 minute aggregation

Rule Value Aggregator

  • Window-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.30MinMeasurements-changelog

  • Needed for buffering all values BEFORE 30 minute aggregation

Rule Value Aggregator

  • Window-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.12HoursMeasurements-changelog

  • Needed for buffering all values BEFORE 12 hour aggregation

Rule Value Aggregator

  • Repartition Topic

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

  • compacted topic

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.1MinMeasurements-repartition

  • Needed for buffering all values BEFORE 1 minute aggregation

Rule Value Aggregator

  • Repartition Topic

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.30MinMeasurements-repartition

  • Needed for buffering all values BEFORE 30 minute aggregation

Rule Value Aggregator

  • Repartition Topic

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.12HoursMeasurements-repartition

  • Needed for buffering all values BEFORE 12 hour aggregation

Rule Value Aggregator

  • Window-Closed-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-1m_aggregation_window_closed-store-changelog

  • Internal result for 1 minute aggregation

Rule Value Aggregator

  • Window-Closed-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-30m_aggregation_window_closed-store-changelog

  • Internal result for 30 minute aggregation

Rule Value Aggregator

  • Window-Closed-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-12h_aggregation_window_closed-store-changelog

  • Internal result for 12 hour aggregation

Example Kafka topic list with minimum configuration

Here is the example kafka topics with minimum configuration for an environment with RM_KAFKA_TOPIC_PREFIX="bci-prep15.", you just need to replace the prefix with your own hostname prefix:

Topic Cleanup policy Replication Factor Retention Time (days) Retention Size (GB) Restricted Access Compression Serialization Format Partitions Count

bci-prep15.bci.cpm.aggregatedMeasurements.1m

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.aggregatedMeasurements.12h

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.aggregatedMeasurements.30m

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.aggregatedRuleResult.changes

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.aggregatedRuleResult.positive

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.functionExecution.machineMessage.<TENANT_ID>

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.functionExecution.measurementMessage.<TENANT_ID>

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.functionExecution.machineMessage.internal

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.functionExecution.measurementMessage.internal

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleResult.<TENANT_ID>

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleResult.internal

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.valueProvider.machineMessage

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.valueProvider.measurementMessage

delete

3

1

8.0

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleResultAggregator-bci-prep15.bci.cpm.stateStore.rra.ruleResult-changelog

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueAggregator-1m_aggregation_window_closed-store-changelog

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueAggregator-12h_aggregation_window_closed-store-changelog

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueAggregator-30m_aggregation_window_closed-store-changelog

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.1MinMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.1MinMeasurements-repartition

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.12HoursMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.12HoursMeasurements-repartition

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.30MinMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.30MinMeasurements-repartition

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.1MinMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.12HoursMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.30MinMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.rawMachineMessages-changelog

compact

3

-

-

yes

gzip

JSON

5

bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.rawMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

Authentication and authorization

In order to protect topic data from misuse the access to Kafka topics needs to be restricted. Therefore a separate "Rules Management"-Kafka-user should be created (see Kafka Authentication (SSL)) and following permissions needs to be assigned via Kafka Topic ACL:

Topic Name Permissions "Rules Management"-Kafka-user kafka-acls.sh option

bci.cpm.*

READ + WRITE

"-producer" + "--consumer"

This means that the Kafka user has read and write access to all topics starting with "bci.cpm.".

(warning) See also Kafka documentation how to secure topics through prefixed ACLs (--resource-pattern-type prefixed).

ACL: Kafka topics for external functions

When integrating external functions within Rules Management a separate Kafka user should be created for every single customer (tenant). This user has only access to certain topics that are needed for the integration. In this example we are using TENANT_ID of "A"-customer:

Topic Name Permissions tenant-specific-kafka-user kafka-acls.sh options

bci.cpm.functionExecution.measurementMessage.TENANT_ID_A

READ

"--consumer"

bci.cpm.functionExecution.machineMessage.TENANT_ID_A

READ

"--consumer"

bci.cpm.ruleResult.TENANT_ID_A

READ + WRITE

"–producer"

Kafka topic service-configuration

The described topics can be created automatically by the Rules Management applications itself. Therefore the following environment variables are provided:

Environment Variable Type Default Description

RM_KAFKA_TOPIC_PREFIX

String

In order to run multiple Rules Management (e.g. DEV, PROD, TEST) a prefix (namespace) can be used e.g. RM_KAFKA_TOPIC_PREFIX=DEV- bci.cpm.valueProvider.measurementMessage → DEV-bci.cpm.valueProvider.measurementMessage

RM_KAFKA_TOPIC_AUTO_CREATION

boolean

false

Flag, if the required topics should be created during application start up.

RM_KAFKA_TOPIC_PARTITIONS

integer

50

Number of partitions for all topics. Only when RM_KAFKA_TOPIC_AUTO_CREATION=true

RM_KAFKA_TOPIC_REPLICAS

integer

1

Number of replicas for all topics. Only when RM_KAFKA_TOPIC_AUTO_CREATION=true

Contents

© Robert Bosch Manufacturing Solutions GmbH 2023-2025, all rights reserved

Changelog Corporate information Legal notice Data protection notice Third party licenses