Nexeed
    • Introduction
    • User manual
      • Condition monitoring and its tabs
        • Live
        • Counters
        • Measurements
        • Events
        • Rules
        • View configuration
        • Details
      • Rules management
        • Rule types and standard functions
        • Rule details
      • Function configuration
      • Condition Monitoring widgets
      • Access Management
        • Application Roles
        • Fine-Grained Access Control and Configuration
        • How to Configure Organization Roles
    • Operations manual
      • Overview
      • System architecture and interfaces
        • System components
      • System requirements
        • General notes
        • cm/condition-monitoring-core
        • cm/rule-service-app
        • cm/rule-function-executor
        • cm/rule-result-aggregator
        • cm/rule-value-aggregator
        • cm/rule-value-provider
        • cm/stateful-function-executor
      • Migration from previous versions
        • Migration to 2.1+
        • Migration from CPM 1.5.4 to CM and RM 3.0.x (Nexeed IAS 2023.02.00.xx)
          • CPM to CM relational database migration
          • CPM to RM relational database migration
          • CM Influx database migration
          • Deletion of an old CPM installation
        • Resources mapping from MES to IAS Condition Monitoring
        • Migration to 4.0.0+ (Nexeed IAS 2024.01.00.xx)
        • Migration to 4.3.x (Nexeed IAS 2024.02.01.x)
        • Migration to 4.5.x (Nexeed IAS 2025.01.00.x)
        • Migration to 4.6.x (Nexeed IAS 2025.01.01.x)
        • Migration to 4.8.x (Nexeed IAS 2025.02.00.x)
        • Migration to 4.9.x (Nexeed IAS 2025.02.01.x)
        • Migration to 4.10.x (Nexeed IAS 2025.02.02.x)
        • Migration to 4.11.x (Nexeed IAS 2026.01.x)
      • Setup and configuration
        • Manual MACMA configuration after setting up a new tenant
        • RabbitMQ
        • Influx configuration
        • Kafka topics
        • Condition Monitoring - Helm Configuration
        • Advanced configuration parameters
          • cm/condition-monitoring-core
            • Common shared variables
            • Portal shared variables
            • MDM shared variables
            • RabbitMQ shared variables
            • OTEL shared variables
            • Graceful shutdown shared variables
          • cm/rule-service-app
            • Rules Management shared variables
            • KAFKA shared variables
          • cm/rule-function-executor
          • cm/rule-result-aggregator
          • cm/rule-value-aggregator
          • cm/rule-value-provider
          • cm/stateful-function-executor
      • Start and shutdown
      • Regular operations
      • Failure handling
        • Rule Management Light Helm installation failing when Kafka is disabled or Kafka is not configured at all
        • User manual injection into Rule Management
        • Infrastructure outages: health verification Endpoints
        • OPP/PPMP are not received in CM
        • Master data (Devices, Facilities, Measuring Points, DeviceTypes) is missing in CM
        • Condition Monitoring Domain is missing in Ticket Management
        • CM is not visible in the portal
        • How to verify if the broker is out of sync
      • Backup and Restore
      • Logging and monitoring
        • General logging characteristics
        • Required monitoring
        • General logging format
        • Request-based logging format
        • Security logging format
        • Lifecycle logging format
        • Module health Endpoints and K8s probes
      • Known limitations
    • API documentation
      • Condition Monitoring HTTP API
      • Rules Management HTTP API
    • Glossary
Condition Monitoring
  • Industrial Application System
  • Core Services
    • Block Management
    • Deviation Processor
    • ID Builder
    • Multitenant Access Control
    • Notification Service
    • Ticket Management
    • Web Portal
  • Shopfloor Management
    • Andon Live
    • Global Production Overview
    • KPI Reporting
    • Operational Routines
    • Shift Book
    • Shopfloor Management Administration
  • Product & Quality
    • Product Setup Management
    • Part Traceability
    • Process Quality
    • Setup Specs
  • Execution
    • Line Control
    • Material Management
    • Order Management
    • Packaging Control
    • Rework Control
  • Intralogistics
    • AGV Control Center
    • Stock Management
    • Transport Management
  • Machine & Equipment
    • Condition Monitoring
    • Device Portal
    • Maintenance Management
    • Tool Management
  • Enterprise & Shopfloor Integration
    • Archiving Bridge
    • Data Publisher
    • Engineering UI
    • ERP Connectivity
    • Gateway
    • Information Router
    • Master Data Management
    • Orchestrator

Nexeed Learning Portal

  • Condition Monitoring
  • Operations manual
  • Setup and configuration
  • Kafka topics
preview 4.11.0 4.10.1 4.10.0

Kafka topics

Pattern

Topics will be generally described by their domain and follow this pattern:

{NAMESPACE}bci.cpm.{DOMAIN}[OPTIONAL.\{SUB_DOMAIN 1}.\{SUB_DOMAIN N}][OPTIONAL.{internal}|{tenant-ID}]

The namespace can be configured by the environment variable rm_kafka_topic_prefix (see also Kafka Topic Service-Configuration)

Topics are usually created manually by the Kafka administrator (request via IT-Service Portal for BD Kafka environments). The topic configurations are listed in the table below. Topics configuration details can be found: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#

However, if the environment variable rm_kafka_topic_auto_creation is set to true, the Rules Management applications will automatically create the topics at startup.

Topic list

Rules Management Topic Name Configuration Recommendations Description Publisher Consumer Group Name

rm_kafka_topic_prefixbci.cpm.valueProvider.measurementMessage

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • PPMP messages with rules for storing previous values/aggregations and enriching rules with requested data (previous values/aggregations)

  • Rule Service App (Context Router)

  • Rule Value Provider: name=bci.cpm.ruleValueProvider

  • Rule Value Aggregator: name=bci.cpm.ruleValueAggregator

rm_kafka_topic_prefixbci.cpm.valueProvider.machineMessage

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

PPMP machine messages with rules for storing previous values and enriching rules with requested data (previous values)

  • Rule Service App (Context Router)

  • Rule Value Provider: name=bci.cpm.ruleValueProvider

  • Rule Value Aggregator: name=bci.cpm.ruleValueAggregator

rm_kafka_topic_prefixbci.cpm.functionExecution.measurementMessage.internal

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Executing standard functions for rules (internal) only PPMP measurement messages

  • Rule Service App (Context Router)

  • Rule Value Provider

  • Rule Function Executor: name=bci.cpm.ruleFunctionExecutor

rm_kafka_topic_prefixbci.cpm.functionExecution.machineMessage.internal

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Executing standard functions for rules (internal) only PPMP machine messages

  • Rule Service App (Context Router)

  • Rule Value Provider

  • Rule Function Executor: name=rm_kafka_topic_prefixbci.cpm.ruleFunctionExecutor

rm_kafka_topic_prefixbci.cpm.functionExecution.measurementMessage.[TENANT_ID]

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Executing custom functions for rules (external) only PPMP measurement messages. One exclusive topic per customer is planned in the beginning.

  • Rule Service App (Context Router)

  • Rule Value Provider

  • External function consumer by customer (only PPMP measurement messages)

rm_kafka_topic_prefixbci.cpm.functionExecution.machineMessage.[TENANT_ID]

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Executing custom functions for rules (external). One exclusive topic per customer is planned in the beginning.

  • Rule Service App (Context Router)

  • Rule Value Provider

  • External function consumer by customer (only PPMP machine messages)

rm_kafka_topic_prefixbci.cpm.ruleResult.internal

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Aggregating all rule results from standard rules (internal)

  • Rule Function Executor

  • Rule Result Aggregator: name=bci.cpm.ruleResultAggregator

rm_kafka_topic_prefixbci.cpm.ruleResult.[TENANT_ID]

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Aggregating all rule results from custom rules (external)

  • External function by customer

  • Rule Result Aggregator name=bci.cpm.ruleResultAggregator

rm_kafka_topic_prefixbci.cpm.aggregatedRuleResult.positive

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Only positive rules

  • Rule Result Aggregator

  • Rule Service App (Context Router): name=bci.cpm.ruleService

rm_kafka_topic_prefixbci.cpm.aggregatedRuleResult.changes

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • Only rule result changes (current state of the rule: rule is triggered or not)

  • Rule Result Aggregator

  • Rule Service App (Context Router): name=bci.cpm.ruleService

rm_kafka_topic_prefixbci.cpm.aggregatedMeasurements.1m

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • 1-minute aggregations for raw measurements (only measuring points where a rule exists)

  • Rule Value Aggregator

  • Rule Value Provider: name=bci.cpm.ruleValueProvider

  • Rule Service App (Context Router): name=bci.cpm.ruleService

rm_kafka_topic_prefixbci.cpm.aggregatedMeasurements.30m

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • 30-minute aggregations for raw measurements (only measuring points where a rule exists)

  • Rule Value Aggregator

  • Rule Value Provider: name=bci.cpm.ruleValueProvider

  • Rule Service App (Context Router): name=bci.cpm.ruleService

rm_kafka_topic_prefixbci.cpm.aggregatedMeasurements.12h

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=delete

  • retention.ms=86400000 # 1 day

  • 12-hours aggregations for raw measurements (only measuring points where a rule exists)

  • Rule Value Aggregator

  • Rule Value Provider: name=bci.cpm.ruleValueProvider

  • Rule Service App (Context Router): name=bci.cpm.ruleService

Internal topics (state stores)

Owner Configuration Recommendations Name Description

Rule Result Aggregator

  • Key/Value-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default) # Compaction runs when 50% of the log is dirty, balancing performance and resource usage

  • min.compaction.lag.ms=0 (default) # Allows immediate compaction after a message is written.

  • max.compaction.lag.ms=9223372036854775807 (default) # No upper limit, so compaction is not forced by time.

  • delete.retention.ms=86400000 (default) # 24 hours #ombstone (delete marker) records are retained for 24 hours before being purged, allowing consumers time to process deletions.

rm_kafka_topic_prefixbci.cpm.ruleResultAggregator-rm_kafka_topic_prefixbci.cpm.stateStore.rra.ruleResult-changelog

  • Latest rule result per device and rule

Rule Value Provider

  • Key/Value-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.rawMeasurements-changelog

  • Raw measurements from PPMP measurements messages (max size per measuring point with default 50)

Rule Value Provider

  • Key/Value-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.1MinMeasurements-changelog

  • 1 minute aggregation of measurements from PPMP measurements messages

Rule Value Provider

  • Key/Value-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.30MinMeasurements-changelog

  • 30 minute aggregation of measurements from PPMP measurements messages

Rule Value Provider

  • Key/Value-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.12HoursMeasurements-changelog

  • 12 hour aggregation of measurements from PPMP measurements messages

Rule Value Provider

  • Key/Value-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueProvider-rm_kafka_topic_prefixbci.cpm.stateStore.rvp.rawMachineMessages-changelog

  • Codes from PPMP machine messages (max size per device with default 50)

Rule Value Aggregator

  • Window-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.1MinMeasurements-changelog

  • Needed for buffering all values BEFORE 1 minute aggregation

Rule Value Aggregator

  • Window-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.30MinMeasurements-changelog

  • Needed for buffering all values BEFORE 30 minute aggregation

Rule Value Aggregator

  • Window-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.12HoursMeasurements-changelog

  • Needed for buffering all values BEFORE 12 hour aggregation

Rule Value Aggregator

  • Repartition Topic

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

  • compacted topic

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.1MinMeasurements-repartition

  • Needed for buffering all values BEFORE 1 minute aggregation

Rule Value Aggregator

  • Repartition Topic

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.30MinMeasurements-repartition

  • Needed for buffering all values BEFORE 30 minute aggregation

Rule Value Aggregator

  • Repartition Topic

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-rm_kafka_topic_prefixbci.cpm.windowStore.rva.12HoursMeasurements-repartition

  • Needed for buffering all values BEFORE 12 hour aggregation

Rule Value Aggregator

  • Window-Closed-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-1m_aggregation_window_closed-store-changelog

  • Internal result for 1 minute aggregation

Rule Value Aggregator

  • Window-Closed-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-30m_aggregation_window_closed-store-changelog

  • Internal result for 30 minute aggregation

Rule Value Aggregator

  • Window-Closed-State Store

  • partitions=5

  • replicas=3 (but > 1 depends on cluster size)

  • cleanup.policy=compact # Keep latest value per key, no retention.time needed

  • min.cleanable.dirty.ratio=0.5 (default)

  • min.compaction.lag.ms=0 (default)

  • max.compaction.lag.ms=9223372036854775807 (default)

  • delete.retention.ms=86400000 # 24 hours (default)

rm_kafka_topic_prefixbci.cpm.ruleValueAggregator-12h_aggregation_window_closed-store-changelog

  • Internal result for 12 hour aggregation

Example Kafka topic list with minimum configuration

Here is the example kafka topics with minimum configuration for an environment with RM_KAFKA_TOPIC_PREFIX="bci-prep15.", you just need to replace the prefix with your own hostname prefix:

Topic Cleanup policy Replication Factor Retention Time (days) Retention Size (GB) Restricted Access Compression Serialization Format Partitions Count Message timestamp Type Access & Authorization

bci-prep15.bci.cpm.aggregatedMeasurements.1m

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.aggregatedMeasurements.12h

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.aggregatedMeasurements.30m

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.aggregatedRuleResult.changes

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.aggregatedRuleResult.positive

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.functionExecution.machineMessage.<TENANT_ID>

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.functionExecution.measurementMessage.<TENANT_ID>

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.functionExecution.machineMessage.internal

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.functionExecution.measurementMessage.internal

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleResult.<TENANT_ID>

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleResult.internal

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.valueProvider.machineMessage

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.valueProvider.measurementMessage

delete

3

1

8.0

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleResultAggregator-bci-prep15.bci.cpm.stateStore.rra.ruleResult-changelog

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueAggregator-1m_aggregation_window_closed-store-changelog

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueAggregator-12h_aggregation_window_closed-store-changelog

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueAggregator-30m_aggregation_window_closed-store-changelog

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.1MinMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.1MinMeasurements-repartition

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.12HoursMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.12HoursMeasurements-repartition

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.30MinMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueAggregator-bci-prep15.bci.cpm.windowStore.rva.30MinMeasurements-repartition

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.1MinMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.12HoursMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.30MinMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.rawMachineMessages-changelog

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

bci-prep15.bci.cpm.ruleValueProvider-bci-prep15.bci.cpm.stateStore.rvp.rawMeasurements-changelog

compact

3

-

-

yes

gzip

JSON

5

CreateTime

Certificate Authorization, Common Name: *.bci-prep15.rb-ias.bosch.com

Authentication and authorization

In order to protect topic data from misuse the access to Kafka topics needs to be restricted. Therefore, a separate "Rules Management"-Kafka-user should be created (see Kafka Authentication (SSL)) and following permissions needs to be assigned via Kafka Topic ACL:

Topic Name Permissions "Rules Management"-Kafka-user kafka-acls.sh option

bci.cpm.*

READ + WRITE

"-producer" + "--consumer"

This means that the Kafka user has read and write access to all topics starting with "bci.cpm.".

(warning) See also Kafka documentation how to secure topics through prefixed ACLs (--resource-pattern-type prefixed).

ACL: Kafka topics for external functions

When integrating external functions within Rules Management a separate Kafka user should be created for every single customer (tenant). This user has only access to certain topics that are needed for the integration. In this example we are using TENANT_ID of "A"-customer:

Topic Name Permissions tenant-specific-kafka-user kafka-acls.sh options

bci.cpm.functionExecution.measurementMessage.TENANT_ID_A

READ

"--consumer"

bci.cpm.functionExecution.machineMessage.TENANT_ID_A

READ

"--consumer"

bci.cpm.ruleResult.TENANT_ID_A

READ + WRITE

"–producer"

Kafka topic service-configuration

The described topics can be created automatically by the Rules Management applications itself. Therefore the following environment variables are provided:

Environment Variable Type Default Description

RM_KAFKA_TOPIC_PREFIX

String

In order to run multiple Rules Management (e.g. DEV, PROD, TEST) a prefix (namespace) can be used e.g. RM_KAFKA_TOPIC_PREFIX=DEV- bci.cpm.valueProvider.measurementMessage → DEV-bci.cpm.valueProvider.measurementMessage

RM_KAFKA_TOPIC_AUTO_CREATION

boolean

false

Flag, if the required topics should be created during application start up.

RM_KAFKA_TOPIC_PARTITIONS

integer

50

Number of partitions for all topics. Only when RM_KAFKA_TOPIC_AUTO_CREATION=true

RM_KAFKA_TOPIC_REPLICAS

integer

1

Number of replicas for all topics. Only when RM_KAFKA_TOPIC_AUTO_CREATION=true

How To: Create Kafka Topics and Configure for On Prem BD Environments

Here is step-by-step guide for creating kafka topics and configuring for On Prem Environment:

Step 1: Requesting Kafka Topic Creation

All Kafka topic creation, modification, or deletion requests must go through the IT Service Portal.

Access the IT Service Portal

  • Navigate to the IT Service Portal.

  • Search for and select the service: Apache Kafka Topics (Create/Change/Delete).

Fill out the Request Form

  • Refer to the example Kafka topic list in your operations manual for guidance.

  • Crucially, you need to change the prefix of the topics according to your server hostname. This ensures topic uniqueness and proper organization.

Cleanup Policy Considerations

  • For topics requiring a cleanup.policy of delete (the default for most log-like topics), create one separate request for all such topics.

  • For topics requiring a cleanup.policy of compact (for key-value store-like topics where only the latest value for a key is retained), create another separate request for all such topics.

For more details about the Kafka topics creation/change/delete process please refer to: Kafka Topic Ordering - Data Streaming Platform

Step 2: Configuring Your K3s/Radium Environment

Once your Kafka topics are approved and created, you need to configure your application environment to connect to and use them. This is done by modifying your custom-values.yml file.

Edit custom-values.yml

Open your custom-values.yml file for editing.

Add Local Kafka Configuration

Under the cm: section, add the following local configuration:

cm:
  local:
    kafkaClientSSLKeyPassword: "<pwd>"
    kafkaClientSSLKeystorePassword: "<pwd>"
    kafkaTopicPrefix: "bci-prep15."
    kafkaTopicPartitions: "5"
    kafkaTopicAutoCreation: false
  • kafkaClientSSLKeyPassword and kafkaClientSSLKeystorePassword: Replace "<pwd>" with the actual passwords for your Kafka client SSL key and keystore.

  • kafkaTopicPrefix: This is critically important. Ensure this prefix exactly matches the prefix you used when requesting topic creation through the IT Service Portal. For example, if your server hostname is bci-prep16, your topic prefix would be bci-prep16..

  • kafkaTopicPartitions: Specify the desired number of partitions for your topics. This should ideally align with what was requested during topic creation.

  • kafkaTopicAutoCreation: false: This setting is crucial. Since you are requesting topics through a formal process, you want to prevent your application from attempting to auto-create topics, which could lead to inconsistencies or errors.

Verify Kafka Cluster Configuration (Existing)

Ensure the custom-values.yml file also contains the correct Kafka cluster configuration under the global and modules sections, as shown in the example:

global:
  serverInstances:
    externalkafka:
      adminPassword: ""
      hosts:
        - <host1>
        - <host2>
        - <host3>
      port: 9093
      tls: true
      type: KAFKA
      default: false

modules:
  cm:
    enabled: true
    ruleManagementLightEnabled: false
    messaging:
      kafkarm:
        serverInstance: externalkafka

Step 3: Deploy/Update Your Environment

With the custom-values.yml updated, you can now deploy or update your on Prem environment and ensure that the kafka services are running like:

  • rule-service-app

  • rule-function-executor

  • rule-value-aggregator

  • rule-value-provider

  • rule-result-aggregator

Contents

© Robert Bosch Manufacturing Solutions GmbH 2023-2025, all rights reserved

Changelog Corporate information Legal notice Data protection notice Third party licenses