Quick Start
Assuming you have kafka available at localhost:9092 and it is not secured (no TLS or ACLs).
Basic JMS Operations
- acquire a release archive
- unzip the archive into a new directory
- change directory to the newly unzipped one
- set JAVA_HOME to a java 1.8 JDK installation
- make the
bin/files executable - start the server as a daemon,
jms-bridge-server-start -daemon etc/jms-bridge/quick-start-basic-jms.conf - open up a new shell in the current directory
- use the
jms-bridgecommand to start a consumer - from the new shell use the
jms-bridgecommand to start publishing messages - once satisfied close out the consumer and producer
- shutdown the server by executing the
jms-bridge-server-stopscript
By Command
mkdir jms-bridge-qscd jms-bridge-qscp ~/Downloads/jms-bridge-*.zip ./unzip jms-bridge-*export JAVA_HOME=<java-1.8-install-dir>chmod -R a=rx,o+w bin/bin/jms-bridge-server-start -daemon etc/jms-bridge/quick-start-basic-jms.conf`bin/jms-bridge jms receive --url tcp://localhost:61616 --topic quick-startIn a new shell from the same directory
export JAVA_HOME=<java-1.8-install-dir>bin/jms-bridge jms send --url tcp://localhost:61616 --topic quick-startmy first messagemy second messagequitIn the previous shell
bin/jms-bridge-server-stopKafka JMS Integration
Assuming you have kafka avaliable at localhost:9092 and it is not secured (no TLS or ACLs).
For this we’ll need to use some of the standard Kafka tooling to manage topics and produce/consume from them.
Assuming kafka-topics, kafka-console-consumer and kafka-console-producer are available.
- acquire a release archive
- unzip the archive into a new directory
- change directory to the newly unzipped one
- set JAVA_HOME to a java 1.8 JDK installation
- make the
bin/files executable - create a kafka topic called
quick-start - start the server,
jms-bridge-server-start etc/jms-bridge/quick-start-kafka-integration.conf - open up 4 new shells from the current directory
- in the first shell use the
jms-bridge jms receivecommand to start a consumer consuming from thekafka.quick-starttopic - in the second shell start a
kafka-console-consumerconsuming from thequick-starttopic - in the third shell use the
jms-bridge jms sendcommand to start publishing messages to thekafka.quick-starttopic - now from the third shell publish some messages, you should see them showing up in the second shell
- in the fourth shell start a
kafka-console-producerpublishing to thequick-starttopic - from the fourth shell publish some messages, they should show up in the first shell
- repeat consuming/producing as much as desired then
control-ceach process and close shells 1, 2, and 4 - finally shutdown the server using
control-cin the original shell (may need to do it twice)
By Command
In original shell
mkdir jms-bridge-qscd jms-bridge-qscp ~/Downloads/jms-bridge-*.zip ./unzip jms-bridge-*export JAVA_HOME=<java-1.8-install-dir>chmod -R a=rx,o+w bin/cd jms-bridge-*kafka-topics --bootstrap-server localhost:9092 --create --topic quick-start --partitions 3 --replication-factor 1bin/jms-bridge-server-start etc/jms-bridge/quick-start-kafka-integration.confOpen up 4 new shells from the same directory as the original.
In shell 1
bin/jms-bridge jms receive --topic kafka.quick-startIn shell 2
kafka-console-consumer --bootstrap-server localhost:9092 --topic quick-startIn shell 3
bin/jms-bridge jms send --topic kafka.quick-startHello KafkaObserve in shell 2 that the “Hello Kafka” message appears.
In shell 4
kafka-console-producer --broker-list localhost:9092 --topic quick-startHello JMSObserve in shell 1 that the “Hello JMS” message appears.
Continue publishing messages from shell 3 and 4 until you are happy.
Close shells 1, 2, 3, and 4.
In the original shell send the TERM signal to the JMS-Bridge by using control-c.
Request / Reply Pattern
In JMS there two different messaging style supported, publish-subscribe (pubsub) and point-to-point (ptp). This differs from Kafka which only supports the pubsub model. Since the JMS Bridge resides in both worlds it can be used to facilitate a ptp like interaction between JMS clients and Kafka clients.
In this example a JMS client will be performing a synchronous request expecting a reply while the Kafka client will be asynchronously responding to it. From the JMS client’s point of view nothing is unusual since it already supports the ptp model. The Kafka client, on the other hand, will need to do a little extra work to tie the request to the response.
This example is done in Java and will require the reader to know enough about java development to finish the code, compile it and then execute it.
Here’s an example main method for the JMS client:
try ( //acquire a JMS Session Session session = amqServer.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE) ) {
// The JMS topic connected to the Kafka topic that our Kafka client will be responding from Topic requestAmqTopic = session.createTopic("kafka.quick-start-request");
TopicSession topicSession = (TopicSession) session; TopicRequestor requestor = new TopicRequestor(topicSession, requestAmqTopic);
try { String request = "Hello, what's your name?"; TextMessage tmsg = session.createTextMessage(request); System.out.println("Request: " + request); Message response = requestor.request(tmsg); System.out.println("Response: " + response.getBody(String.class)); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); }
System.out.println("jms disconnected."); } }Here for the Kafka client:
try ( KafkaProducer<byte[], String> kproducer = new KafkaProducer<>( kprops, new ByteArraySerializer(), new StringSerializer()); KafkaConsumer<byte[], String> kconsumer = new KafkaConsumer<>( kprops, new ByteArrayDeserializer(), new StringDeserializer()); ) {
kconsumer.subscribe(Collections.singleton("quick-start-request"));
while (true) { ConsumerRecords<byte[], String> pollRecords = kconsumer.poll(Duration.ofMillis(100L)); if (pollRecords != null) { pollRecords.forEach(request -> {
//Extract the JMSReployTo header value, should refer to a temporary topic Header replyTo = request.headers().lastHeader("jms.JMSReplyTo"); final byte[] destination = replyTo != null ? replyTo.value() : null;
String responseValue = "Hi, my name is Kafka"; System.out.println(" Response: " + responseValue);
ProducerRecord<byte[], String> response = new ProducerRecord( "quick-start-response", request.key(), responseValue);
if (destination != null) { //set the destination to that temporary topic from the request header response.headers().add("jms.JMSDestination", destination); }
//synchronous publish for example only, not recommended, use async version with callback instead kproducer.send(response).get(); }); } } }For the example to work some preparatory tasks need to be completed.
- Create the
quick-start-requestandquick-start-responsetopics in Kafka. - Start the JMS Bridge using the
etc/jms-bridge/quick-start-kafka-request-reply.confconfiguration with the correctbootstrap.servers.