Send Notification Messages for state changes

This page details the functionality implemented in the Digital Twin Registry (DTR) for sending notifications into the Notification Messages whenever the state of an owned entity changes.

It also explains the necessary steps for connecting to the Notification Messages and consuming those pushed notification messages.

The functionality of Notification Messages is currently in a preview state, therefore this feature is available to dedicated customers only. For enabling the Notification Messages, click the following link to find a prepared email with placeholders for all required content: support.semantic-stack@bosch.com

Hello Bosch Semantic Stack team,

I would like to have the functionality of Notification Messages enabled within the Bosch Semantic Stack application for the tenant: insert your tenant name here

Upon success, you will receive an answer via email within a few days.

Pushing (providing) messages to Notification Messages

The Digital Twin Registry offers a sub-service called Notification Service, which is realized with the Apache Kafka message broker. The Notification Service pushes Kafka messages into a specific tenant-based topic whenever one of the following domain entities is changed (created, merged, replaced, deleted etc.):

For each resource the Digital Twin Registry defines new message types as Apache Avro schemas and publishes them automatically in the Schema Registry using the _RecordNameStrategy as Subject Name Strategy whenever new type of events are pushed into the Notification Messages

This strategy derives the subject name from the fully qualified Avro record name, and provides further a way to group logically the related events that may have different data structures under a topic.

The RecordNameStrategy attribute of each Kafka message helps also the Kafka messages Consumer applications in automatically detecting and using the proper schema from the defined Schema Registry to de-serialize the received Kafka message.

Details about pushed messages

Message Header has elements as detailed in below table:

Attribute Type Description Required Default

Content-Type

String

Content type of the payload e.g. avro, application-json etc.

Y

application/avro

Event-Type

ENUM/String

CREATED, MERGED, REPLACED, DELETED

Y

Entity-Id

Long

The ID of the referenced entity.

Y

Subject-Name

String

Subject name of the record. This is also the name of schema in the Schema Registry.

Y

kafka_messageKey

String

Subject name of the record. That the records with the same type are on the same partition.

Y

kafka_timestamp

Long

timestamp

Y

Message Content has the following characteristics:

Attribute type Description Required Default

body

byte[]

Payload of the DTO after the operation was successfully.

Y

The screenshots below showcase a visual representation of such Kafka message being pushed to Notification Messages when a new twin was requested to be created in Digital Twin Registry.

Digital Twin message in Notification Messages - Key
Digital Twin message in Notification Messages - Header
Digital Twin message in Notification Messages - Body

Consuming messages from the Notification Messages

Before registering to receive notifications per tenant ID, a user needs to make sure to have the role "MESSAGE_OPERATOR" in the Access Management (MACMA). Please check Authentication and Authorization for more information about how to authenticate and be authorized in Digital Twin Registry.

Connecting to the Notification Messages

Using your tenant ID, access the Client Credentials REST Endpoint of Digital Twin Registry at <host>/settings-api/v1/<your-tenant-id>/client-credentials for retrieving back the credentials, connection URL and schema registry through the received REST response.

Below you can find an example of the JSON response:

{
    "apiKeyName": "4dfe0288ddd911edb5ea0242ac120002",
    "apiKeySecret": "SaxtZM83yeeQPVnIb6aT",
    "schemaRegistryApiKeyName": "sdadsas0288ddd911edb5ea0242ac120002",
    "schemaRegistryApiKeySecret": "asdvZM83yeeQPVnIb6aT",
    "kafkaClusterUrl": ["SASL_SSL://abc.westeurope.azure.confluent.cloud:9092"],
    "schemaRegistryUrl": "https://abc.westeurope.azure.confluent.cloud"
}

Explanations for each REST Response attribute are given in the table below:

Property Name Type Description

apiKeyName

String

The API Key Name used to subscribe to the Kafka Cluster

apiKeySecret

String

The API Key Secret used to subscribe to the Kafka Cluster

schemaRegistryApiKeyName

String

The API Key Name used to subscribe to the Schema Registry

schemaRegistryApiKeySecret

String

The API Key Secret used to subscribe to the Schema Registry

kafkaClusterUrl

Array[string]

The URL of the Kafka Server

schemaRegistryUrl

String

The URL of Schema Registry used for serializing/de-serializing the Kafka message

Learn more about retrieving the connection settings to Notification Messages by consulting the Registry Settings API.

Connecting to the Notification Messages

Using the credentials, your client can further authenticate in kafkaClusterUrl and connect to the tenant-based Kafka topic having the name pattern: registry.message.api.event.tenantID and it can now consume Kafka messages.

The de-serialization of the received Kafka messages should be performed with help of the published schemaRegistryUrl, which could be accessed using the schemaRegistry related credentials.

Below is a basic configuration of a Spring Boot Consumer able to receive events from the Notification Messages.

spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.group-id=dtr
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
spring.kafka.event.topic-name-test=registry.message.api.event.${tenant-id}
spring.kafka.consumer.bootstrap-servers=url
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';
spring.kafka.properties.ssl.endpoint.identification.algorithm=https
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.request.timeout.ms=20000
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.retry.backoff.ms=500
# Schema Registry specific settings
spring.kafka.properties.schema.registry.url=url
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.schema.registry.basic.auth.user.info=user-name:secret

Summary

Now the sub-service for pushing Notification Messages is successfully set up and the Digital Twin Registry UI can be used to push messages about twins, twin groups or other domain entities. These can then be consumed by any user of the tenant (after performing the explained configuration) via browser or programmatically, for example, via Spring Boot Consumer.