MicroProfile Reactive Messaging 2.0 in WildFly 25

For WildFly 25, we upgraded the MicroProfile Reactive Messaging support from version 1 to 2. It contains a new @Channel annotation, which in conjunction with the new Emitter interface, were introduced in order to make it possible to push data into the MicroProfile Reactive streams from code initiated by a user.

The MicroProfile Reactive Messaing implementation in WildFly is based on the SmallRye Reactive Messaging project. The version included in WildFly 25, introduces a new API to have more control over how we interact with Kafka. We expose this API in WildFly 25.

This post will:

  • Take a simple web application, consisting of a few html pages, and add a Servlet filter to push information about page visits into Reactive Messaging via an Emitter.

    • These messages will be forwarded onto Kafka

  • Show a standalone application to read the last visited page per user from Kafka via the Kafka Streams API

  • Deploy the above application into WildFly, bundling the Kafka Streams API (which we don’t ship in WildFly) to read the last visited page per user.

The code for the application is on GitHub. Additionally, you can find more information about the MicroProfile Reactive Messaging functionality in WildFly in the WildFly documentation.

Running the application

See the GitHub repository README for instructions on how to build and run the different parts of the application. Here we will focus on explaining how it works.

The main application

The main application is contained in the app folder.

The core of the application is a few html pages which link to each other. Now, we want to track which user visited which page. We do this by enhancing the application with a Servlet filter called MessagingFilter:

public class MessagingFilter extends HttpFilter {
    @Inject
    @Channel("from-filter")
    Emitter<PageVisit> messagingEmitter;

FIrst we have field injected via CDI of type Emitter, called messagingEmitter, which is annotated with @Channel. This Emitter instance makes it a breeze to push data to the MicroProfile Reactive Messaging stream indicated by the value of the @Channel annotation (i.e. from-filter).

    @Override
    public void doFilter(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws IOException, ServletException {
        String user = getUsername(request);
        String address = request.getRemoteAddr();
        String page = Paths.get(request.getRequestURI()).getFileName().toString();


        PageVisit pv = new PageVisit(user, address, page);
        messagingEmitter.send(pv);

Next we gather information about the request (user, address and page name), and bundle this information up in a PageVisit bean.

We then use the injected Emitter to send the PageVisit instance. The Emitter will then send the PageVisit to the from-filter stream.

        // Disable caching for the html pages
        ((HttpServletResponse)response).addHeader("Cache-control", "no-store");
        ((HttpServletResponse)response).addHeader("Pragma", "no-cache");

        filterChain.doFilter(request, response);
    }

Getting the user name is simulated in the following method which randomly chooses a user for the current request. For the purposes of this demo this is to get a few different users in the data recorded when we click on the links when running the application.

    private String getUsername(HttpServletRequest servletRequest) {
        // Pretend we're looking up the authenticated user
        switch ((int)Math.round(Math.random() * 3)) {
            case 0:
                return "bob";
            case 1:
                return "emma";
            case 2:
                return "frank";
            case 3:
                return "linda";
        }
        return null;
    }
}

Next, we have an ApplicationScoped CDI bean called MessagingBean

@ApplicationScoped
public class MessagingBean {
    @Inject
    @Channel("special")
    Emitter<PageVisit> special;

    @Incoming("from-filter")
    @Outgoing("kafka-visits")
    public Message<PageVisit> fromFilter(PageVisit pageVisit) {
        if (pageVisit.getPage().equals("3.html")) {
            special.send(pageVisit);
        }
        Message<PageVisit> msg = Message.of(pageVisit);
        msg = KafkaMetadataUtil.writeOutgoingKafkaMetadata(
                msg,
                OutgoingKafkaRecordMetadata
                        .<String>builder()
                        .withKey(pageVisit.getUserName())
                        .build());
        return msg;
    }

    @Incoming("special")
    public void special(PageVisit pageVisit) {
        System.out.println("===> " + pageVisit.getUserName() + " visited " + pageVisit.getPage());
    }
}

The fromFilter() method is annotated with the @Incoming("from-filter") annotation (from version 1 of the specification) and will receive all messages that were sent on our previous Emitter.

Since the both the @Incoming and @Channel annotations use the value from-filter (i.e. they match), we end up with a simple in-memory stream. We could of course have routed this via Kafka, but for this example I wanted to keep the configuration needed to map to Kafka as simple as possible. The WildFly documentation goes into more details about how to configure MicroProfile Reactive Messaging streams to consume from Kafka topics.

The fromFilter() method is also annotated with @Outgoing("kafka-visits"), and so it is expected that all incoming messages from the from-filter stream will be forwarded onto the kafka-visits stream.

The kafka-visits stream is backed by Kafka (we will see how to map this stream onto a Kafka topic in a second). In this case we decide that we want messages sent on this topic to have a Kafka key, so we:

  • Wrap the incoming PageVisit object in a Message object, which comes from the MicroProfile Reactive Messaging specification.

  • We then create an OutgoingKafkaRecordMetadata instance, where we set the key of the record to be the user. We add this metadata to the message by calling KafkaMetadataUtil.writeOutgoingKafkaMetadata(). The mentioned classes come from the new SmallRye Kafka API.

  • Finally we return the massaged Message containing our received PageVisit instance, which will forward it to the kafka-visits stream.

Another thing going on in this example, is that we’re using an injected Emitter to 'fork' the sending of the received data to an additional location. In fromFilter(), if the page 3.html was visited, we will also send the received PageVisit via the injected Emitter. This in turn will send the PageVisit instance on the special stream indicated in its @Channel annotation.

The special() method, annotated with @Incoming(`special) receives messages from the special stream (i.e. the ones sent via the Emitter).

When running the application, and clicking on the 3 link, you should see output in the server logs. Additionally, every click on any link will show up in the Kafka consumer logs mentioned in the example README. So, in addition to being able to easily send data from user-initiated code, Emitter is useful for 'forking' streams, so you can send data to more than one location. This functionality was not present in version 1 of the specification.

To map the kafka-visits stream to a Kafka topic we do the configuration in microprofile-config.properties:

mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092

mp.messaging.outgoing.kafka-visits.connector=smallrye-kafka
mp.messaging.outgoing.kafka-visits.topic=page-visits
mp.messaging.outgoing.kafka-visits.value.serializer=org.wildfly.blog.reactive.messaging.common.PageVisitsSerializer

This points the mapping towards localhost:9092 to connect to Kafka, maps the kafka-visits stream to the page-visits kafka topic, and specifies PageVisitsSerializer to be used to serialize the PageVisit instances that we send to Kafka. The WildFly documentation contains more detailed information about this configuration.

If you deploy the application into WildFly, as outlined in the example README, and you performed the optional step of connecting a Kafka consumer, you should see the output similar to this in the Kafka consumer terminal as you click the links in the application hosted at http://localhost:8080/app/:

frank	127.0.0.1app
emma	127.0.0.13.html
frank	127.0.0.11.html
linda	127.0.0.13.html
frank	127.0.0.11.html
emma	127.0.0.12.html
frank	127.0.0.13.html

When you visit 3.html, there will be additional output from the special() method in WildFly’s server.log

===> emma visited 3.html
===> linda visited 3.html
===> frank visited 3.html

Reading data from Kafka in a standalone application

While it is nice to be able to send (and receive, although not shown in this example) messages via Kafka, we may want to query the data in Kafka later.

The code for the command line application to query data from Kafka is contained in the streams folder. It contains a very simple (I am a beginner at this part) application to get the most recent page visits per user. It uses the Kafka Streams API to interact with Kafka.

The Main class calls through to a more interesting DataStoreWrapper class.

    public static void main(String[] args) throws Exception {
        try (DataStoreWrapper dsw = new DataStoreWrapper()) {
            dsw.init();
            Map<String, String> lastPagesByUser = Collections.emptyMap();
            try {
                dsw.readLastVisitedPageByUsers();
            } catch (InvalidStateStoreException e) {
            }
            if (lastPagesByUser.size() == 0) {
                // It seems that although the stream is reported as RUNNING
                // in dsw.init() it still needs some time to settle. Until that
                // happens there is no data or we get InvalidStateStoreException
                Thread.sleep(4000);
                lastPagesByUser = dsw.readLastVisitedPageByUsers();
            }
            System.out.println("Last pages visited:\n" + lastPagesByUser);
        }
    }
}
Note
There is some error handling here. In case you get no entries, or if you get InvalidStateStoreException, try increasing the timeout in the sleep.

Looking at the DataStoreWrapper class, the first thing to note is that it is 'CDI ready'. Although this section will run it as a standalone application where CDI is not relevant, we will reuse this class later in an application deployed in WildFly.

@ApplicationScoped
public class DataStoreWrapper implements Closeable {
    private volatile KafkaStreams streams;

We will initialise this streams instance in the init() method below.

    @Inject
    private ConfigSupplier configSupplier = new ConfigSupplier() {
        @Override
        public String getBootstrapServers() {
            return "localhost:9092";
        }

        @Override
        public String getTopicName() {
            return "page-visits";
        }
    };

The configSupplier field is inititalised to an implementation of ConfigSupplier which hard codes the values of the Kafka bootstrap servers, and the topic name. When deploying this into WildFly later we will use MicroProfile Config to set these values to avoid hard coding them.

    DataStoreWrapper() {
    }

Next, we will take a look at the init() method where we set up the ability to query the stream.

    @PostConstruct
    void init() {
        try {

            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, configSupplier.getBootstrapServers());
            props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
            props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PageVisitSerde.class.getName());
            // For this we want to read all the data
            props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

The above sets configuration properties to connect to kafka, and sets Serdes for (de)serializing the Kafka record keys and values. The class PageVisitSerde is used to (de)serialise our PageVisit class from earlier.

We also specify that we want all the data stored on this topic.

            final StreamsBuilder builder = new StreamsBuilder();
            KeyValueBytesStoreSupplier stateStore = Stores.inMemoryKeyValueStore("test-store");
            KTable<String, PageVisit> source = builder.table(
                    configSupplier.getTopicName(),
                    Materialized.<String, PageVisit>as(stateStore)
                            .withKeySerde(Serdes.String())
                            .withValueSerde(new PageVisitSerde()));
            final Topology topology = builder.build();
            this.streams = new KafkaStreams(topology, props);

Now we create a KTable associated with the Kafka topic, and create a StateStore from that. In this case since we are using the Kafka record key (above we used the user for this when sending to Kafka) as the KTable key, we will get one entry (the latest) for each user. Note this is a very simple example, and not an in-depth exploration of the Kafka Streams API, so of course more advanced views on the stored data are possible!

            final CountDownLatch startLatch = new CountDownLatch(1);
            final AtomicReference<KafkaStreams.State> state = new AtomicReference<>();
            streams.setStateListener((newState, oldState) -> {
                state.set(newState);
                switch (newState) {
                    case RUNNING:
                    case ERROR:
                    case PENDING_SHUTDOWN:
                        startLatch.countDown();
                }
            });
            this.streams.start();
            startLatch.await(10, TimeUnit.SECONDS);
            System.out.println("Stream started");

            if (state.get() != KafkaStreams.State.RUNNING) {
                throw new IllegalStateException();
            }

Finally, we start the stream and wait for it to start.

        } catch (Exception e) {
            if (this.streams != null) {
                this.streams.close();
            }
            throw new RuntimeException(e);
        }
    }

The readLastVisitedPageByUsers() method uses the StateStore we set up earlier and returns all the found entries:

    public Map<String, String> readLastVisitedPageByUsers() {
        StoreQueryParameters<ReadOnlyKeyValueStore<String, PageVisit>> sqp = StoreQueryParameters.fromNameAndType("test-store", QueryableStoreTypes.keyValueStore());
        final ReadOnlyKeyValueStore<String, PageVisit> store = this.streams.store(sqp);

        Map<String, String> lastPageByUser = new HashMap<>();
        KeyValueIterator<String, PageVisit> it = store.all();
        it.forEachRemaining(keyValue -> lastPageByUser.put(keyValue.key, keyValue.value.getPage()));
        return lastPageByUser;
    }

    @PreDestroy
    public void close() {
        this.streams.close();
    }

}

If you run the application, following the instructions in the example README, you should see output like this:

Stream started
Last pages visited:
{frank=3.html, emma=2.html, linda=3.html}

As already mentioned, this will be the latest page visited for each user.

Reading data from Kafka in a WildFly application

WildFly does not ship with the Kafka Streams API, but we can still deploy the application above into WildFly with some adjustments in how we package it. The example README contains more details, but in a nutshell we:

  • Include the Kafka Streams API jar in our deployment

  • Make sure we don’t include all the Kafka Streams API jar’s transitive dependencies in our deployment since they already exist in WildFly.

  • Modify the deployment’s META-INF/MANIFEST.MF to set up a dependency on the org.apache.kafka.client JBoss Module. This module contains the Kafka client jar, which is needed by the Kafka Streams API.

In our standalone application, we hardcoded the bootstrap servers and the topic name. When deploying to WildFly we would like to avoid recompiling the application if, say, Kafka moves somewhere else, so we specify this information in microprofile-config.properties:

kafka.bootstrap.servers=localhost:9092
kafka.topic=page-visits

We then create an implementation of the ConfigSupplier interface in MpConfigConfigSupplier. This is an ApplicationScoped CDI bean which gets injected with the MicroProfile Config containing the properties from the microprofile-config.properties file:

@ApplicationScoped
public class MpConfigConfigSupplier implements ConfigSupplier {
    @Inject
    Config config;

    @Override
    public String getBootstrapServers() {
        return config.getValue("kafka.bootstrap.servers", String.class);
    }

    @Override
    public String getTopicName() {
        return config.getValue("kafka.topic", String.class);
    }
}

Our DataStoreWrapper class from earlier is a CDI bean, and so our MpConfigConfigSupplier will get injected into its configSupplier field, overwriting the default implementation that was used in the standalone application case:

@ApplicationScoped
public class DataStoreWrapper implements Closeable {
    private volatile KafkaStreams streams;

    @Inject
    private ConfigSupplier configSupplier = new ConfigSupplier() {
        // -- SNIP --
        // This implementation gets replaced by the injected MpConfigConfigSupplier

In order to be able to call this from a client, we add a simple REST endpoint:

@Path("/")
@Produces(MediaType.APPLICATION_JSON)
public class StreamsEndpoint {
    @Inject
    DataStoreWrapper wrapper;

    @GET
    @Path("/last-visited")
    public Map<String, String> getLastVisited() {
        return wrapper.readLastVisitedPageByUsers();
    }
}

This simply delegates to our DataStoreWrapper.

If you deploy the application as outlined in the example README, and visit http://localhost:8080/streams/last-visited you should see output like:

{"frank":"3.html","emma":"2.html","linda":"3.html"}

Conclusion

We have seen how to leverage the new Emitter in MicroProfile Reactive Messaging 2 to push data to MicroProfile Reactive Messaging Streams, and how to send data to Kafka. We also used the new Kafka User API to set the Kafka record key in the data sent to Kafka.

Although we did not receive data from Kafka in this example, we leveraged the Kafka Streams API to read the data we stored in Kafka in a standalone application as well as in an application deployed to WildFly.

References

The WildFly documentation contains more information on the various configuration options for using MicroProfile Reactive Messaging with Kafka in WildFly.

Also, the SmallRye Reactive Messaging Kafka Connector documentation contains a fuller reference of configuration options for Kafka, as well as more information about MicroProfile Reactive Messaging in general.

Finally, the MicroProfile Reactive Messaging specification can be found in the eclipse/microprofile-reactive-messaging GitHub project.