Send Notification Messages for state changes
This page details the functionality implemented in the Digital Twin Registry (DTR) for sending notifications into the Notification Messages whenever the state of an owned entity changes.
It also explains the necessary steps for connecting to the Notification Messages and consuming those pushed notification messages.
The functionality of Notification Messages is currently in a preview state, therefore this feature is available to dedicated customers only. For enabling the Notification Messages, click the following link to find a prepared email with placeholders for all required content: support.semantic-stack@bosch.com |
Hello Bosch Semantic Stack team,
I would like to have the functionality of Notification Messages enabled within the Bosch Semantic Stack application for the tenant: insert your tenant name here
Upon success, you will receive an answer via email within a few days.
Pushing (providing) messages to Notification Messages
The Digital Twin Registry offers a sub-service called Notification Service, which is realized with the Apache Kafka message broker. The Notification Service pushes Kafka messages into a specific tenant-based topic whenever one of the following domain entities is changed (created, merged, replaced, deleted etc.):
For each resource the Digital Twin Registry defines new message types as Apache Avro schemas and publishes them automatically in the Schema Registry using the RecordNameStrategy as Subject Name Strategy whenever new type of events are pushed into the Notification Messages
This strategy derives the subject name from the fully qualified Avro record name, and provides further a way to group logically the related events that may have different data structures under a topic.
The RecordNameStrategy attribute of each Kafka message helps also the Kafka messages Consumer applications in automatically detecting and using the proper schema from the defined Schema Registry to de-serialize the received Kafka message.
Details about pushed messages
Message Header has elements as detailed in below table:
Attribute | Type | Description | Required | Default |
---|---|---|---|---|
Content-Type |
String |
Content type of the payload e.g. avro, application-json etc. |
Y |
application/avro |
Event-Type |
ENUM/String |
CREATED, MERGED, REPLACED, DELETED |
Y |
|
Entity-Id |
Long |
The ID of the referenced entity. |
Y |
|
Subject-Name |
String |
Subject name of the record. This is also the name of schema in the Schema Registry. |
Y |
|
kafka_messageKey |
String |
Subject name of the record. That the records with the same type are on the same partition. |
Y |
|
kafka_timestamp |
Long |
timestamp |
Y |
Message Content has the following characteristics:
Attribute | type | Description | Required | Default |
---|---|---|---|---|
body |
byte[] |
Payload of the DTO after the operation was successfully. |
Y |
The screenshots below showcase a visual representation of such Kafka message being pushed to Notification Messages when a new twin was requested to be created in Digital Twin Registry.
Consuming messages from the Notification Messages
Before registering to receive notifications per tenant ID, a user needs to make sure to have the role "MESSAGE_OPERATOR" in the Access Management (MACMA). Please check Authentication and Authorization for more information about how to authenticate and be authorized in Digital Twin Registry. |
Connecting to the Notification Messages
Using your tenant ID, access the Client Credentials REST Endpoint of Digital Twin Registry at <host>/settings-api/v1/<your-tenant-id>/client-credentials
for retrieving back the credentials, connection URL and schema registry through the received REST response.
Below you can find an example of the JSON response:
{
"apiKeyName": "4dfe0288ddd911edb5ea0242ac120002",
"apiKeySecret": "SaxtZM83yeeQPVnIb6aT",
"schemaRegistryApiKeyName": "sdadsas0288ddd911edb5ea0242ac120002",
"schemaRegistryApiKeySecret": "asdvZM83yeeQPVnIb6aT",
"kafkaClusterUrl": ["SASL_SSL://abc.westeurope.azure.confluent.cloud:9092"],
"schemaRegistryUrl": "https://abc.westeurope.azure.confluent.cloud"
}
Explanations for each REST Response attribute are given in the table below:
Property Name | Type | Description |
---|---|---|
apiKeyName |
String |
The API Key Name used to subscribe to the Kafka Cluster |
apiKeySecret |
String |
The API Key Secret used to subscribe to the Kafka Cluster |
schemaRegistryApiKeyName |
String |
The API Key Name used to subscribe to the Schema Registry |
schemaRegistryApiKeySecret |
String |
The API Key Secret used to subscribe to the Schema Registry |
kafkaClusterUrl |
Array[string] |
The URL of the Kafka Server |
schemaRegistryUrl |
String |
The URL of Schema Registry used for serializing/de-serializing the Kafka message |
Learn more about retrieving the connection settings to Notification Messages by consulting the Registry Settings API.
Connecting to the Notification Messages
Using the credentials, your client can further authenticate in kafkaClusterUrl and connect to the tenant-based Kafka topic having the name pattern: registry.message.api.event.tenantID
and it can now consume Kafka messages.
The de-serialization of the received Kafka messages should be performed with help of the published schemaRegistryUrl
, which could be accessed using the schemaRegistry related credentials.
Below is a basic configuration of a Spring Boot Consumer able to receive events from the Notification Messages.
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.group-id=dtr
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
spring.kafka.event.topic-name-test=registry.message.api.event.${tenant-id}
spring.kafka.consumer.bootstrap-servers=url
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';
spring.kafka.properties.ssl.endpoint.identification.algorithm=https
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.request.timeout.ms=20000
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.retry.backoff.ms=500
# Schema Registry specific settings
spring.kafka.properties.schema.registry.url=url
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.schema.registry.basic.auth.user.info=user-name:secret
Summary
Now the sub-service for pushing Notification Messages is successfully set up and the Digital Twin Registry UI can be used to push messages about twins, twin groups or other domain entities. These can then be consumed by any user of the tenant (after performing the explained configuration) via browser or programmatically, for example, via Spring Boot Consumer.