Wednesday 28 September 2022

Let's Kafka

I thought of documenting my Kafka learning process so one might find this helpful. 

How to start the broker:

$ ./zookeeper-server-start.sh -daemon /home/bershath/apps/kafka/kafka_2.13-3.1.0.redhat-00004/config/zookeeper.properties

#Verify the existence of zookeeper

$ jcmd | grep zookeeper

$ ./kafka-server-start.sh -daemon /home/bershath/apps/kafka/kafka_2.13-3.1.0.redhat-00004/config/server.properties

#Verify Kafka is running

$ jcmd | grep kafka


How to send a few messages and receive them to verify the installation

$ ./kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test

$ ./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test

$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning



Tuesday 7 August 2018

Left blank

This page intentionally left blank to recreate JMS 2 specific content without using any confusing private APIs ;-)

Thanks
Ty

Sunday 10 November 2013

Sending and receiving a message using JMS 2 API

Sending a message with JMS 2.0
The JMS 2.0 aka simplified API has reduced the number of JMS objects required to send a message to a destination. The simplified API provides two objects for the said task, JMSProducer and JMSConsumer interfaces respectively. The good thing about them is they both allow method calls to be chained together. 

Let's take a look at the sender code. 

public void sendMessage(String messageText, String destinationAddress){
   ResourceFactory resourceFactory = new ResourceFactoryImpl();
   try(JMSContext jmsContext = resourceFactory.getJMSContext(JMSContext.AUTO_ACKNOWLEDGE);){
Queue queue = jmsContext.createQueue(destinationAddress);
jmsContext.createProducer().send(queue, messageText);
   }
}


A word on JMSContext.createQueue(String queueAddress)
The JMSContext.createQueue(String queueAddress) would not create a physical destination but Queue object for an existing destination. It is encouraged to look up for the administrative Queue object on the JNDI namespace since this approach reduces the chances of portability of the code as we have to provide it with a vendor specific physical Queue name. 

However, in this example, it's required to provide this method with the physical queue name. In HornetQ, it's the queue address name.

On a side note, the JMS API does not provide a mechanism to create permanent destinations. It's up to the JMS implementation to provide a mechanism to create permanent destination(s). This is not mandatory since JMS specification doesn't impose JMS providers to implement such a feature.

However the JMS API allows the creation of temporary destinations. The temporary destination(s) would be removed and garbage collected when the JMS Session/JMSContext associated with the temporary destination gets closed. 


Receiving a message with simplified API 
The JMS receiver code is pretty simple too. One thing you would notice is the ease of obtaining the message payload using JMSConsumer.receiveBody(), without performing any typecasting as it was with the JMS 1.1 specification. Here's the receiver code :

public String receiveTextMessage(String destinationAddress){
   ResourceFactory resourceFactory = new ResourceFactoryImpl();
   try(JMSContext jmsContext = resourceFactory.getJMSContext(JMSContext.AUTO_ACKNOWLEDGE);){
      Queue queue = jmsContext.createQueue(destinationAddress);
      JMSConsumer jmsConsumer = jmsContext.createConsumer(queue);
      return jmsConsumer.receiveBody(String.class);
   } 
}



The ResourceFactory helper interface and its implementation excerpt :

public interface ResourceFactory {
   public ConnectionFactory getConnectionFactory();
   public JMSContext getJMSContext(int sessionTx);
}


public class ResourceFactoryImpl implements ResourceFactory {
....
....
    @Override
    public JMSContext getJMSContext(int sessionTx) {
       ConnectionFactory connectionFactory = getConnectionFactory();
       JMSContext jmsContext = connectionFactory.createContext(JMSHelper.getProperty("jms.user.name"), JMSHelper.getProperty("jms.user.password"),sessionTx);
       return jmsContext;
    }
....
....
}

Friday 8 November 2013

JMSContext in action

One of the key changes in JMS 2.0 (simplified) API is the introduction of JMSContext Interface. JMSContext encapsulates the functionality of both Connection and a Session. This eliminates the need to create a JMS session explicitly, thus it reduces the number of JMS objects needed to send a message. A JMSContext can be used to create JMSConsumer(s), JMSProducer(s) or more JMSContext(s) as preferred.


Starting a JMSContext
With the JMS 1.1 specification, you need to start a connection to receive messages explicitly by invoking connection.start(). The connection objects are in "stopped" mode so you need to start the connection to consume messages. Please mind, you *only* required to start a connection if you're planning to consume messages via the created connection object, else it is *not* required to start a connection. 

The JMS specification is a bit ambiguous on the "state" of the JMSContext. The specification doesn't expect JMS providers to offer a JMSContext at "started" state by default. Instead, it expects JMS providers to auto start the JMSContext at the creation of the first consumer. However, HornetQ offers a JMSContext at started state irrespective to that. You could invoke JMSContext.getAutoStart() to obtain the status of the context.


Connection sharing with JMSContext (How to cache a JMSContext)
On my previous blog post, I explained it's an anti-pattern to create a connection at each message send() when sending messages from a plain JMS client. How could that be possible with JMSContext; since it represents both, session and a connection? 

Yes, this is still doable with JMSContext on an application client. A JMSContext is not recommended to be accessed concurrently to ensure thread safety. However you could still maintain a single physical connection and share it among multiple JMS client threads by issuing each JMS client with a JMSContext. You need to invoke JMSContext.createContext(int sessionMode) for each JMS client; each JMSContext.createContext() invocation would not create a new physical connection but a new JMSContext. When you invoke JMSContext.close() on a JMSContext, it would close just the JMSContext not the physical connection if it's being used by another (active) JMSContext(s). Here's an illustration :

JMSContext jmsContext = resourceFactory.getJMSContext(JMSContext.AUTO_ACKNOWLEDGE);
JMSContext jmsContext2 = jmsContext.createContext(0);
JMSContext jmsContext3 = jmsContext.createContext(0);
jmsContext.close();   
jmsContext3.close();
//The physical connection still active although the initial JMSContext was closed.
Queue queue = jmsContext2.createQueue("A"); 
JMSConsumer jmsConsumer = jmsContext2.createConsumer(queue);


Closing a JMSContext
Invoking JMSContext.close() is suffice to close a connection. The client code does *not* need to close its associated resources, such as JMS consumers and JMS producers. The JMS provider closes all the associated resources when you invoke JMSContext.close().


The invocation of JMSContext.createContext(int sessionMode) is prohibited inside a JEE container as it violates JEE 7 specification. The WEB, EJB applications inside a JEE container are not allowed to have more than an active JMSContext at any given moment. The container would throw JMSRuntimeException if attempted to create more than a single active JMS context. (Please refer JEE 7 Spec, section 6:7)

Thursday 31 October 2013

JMS Connection Factories Explained

What ConnectionFactory should I use in my application ?

I thought it would be good to explain a bit about connection factories before I go on explaining JMS 2.0. The connection factories are one of the two administrative objects offered by JMS API. Both JMS 1.1 and 2.0 support the generic ConnectionFactory. The connection factories support concurrent use just like the Destination administrative object. There are domain specific connection factories in JMS 1.1 spec and they remain unchanged in 2.0. I personally discourage the use of domain specific connection factories just as it was said in the spec. The WildFly application server provides four connectionfactory instances on its default JMS provider, HornetQ's configuration :

<jms-connection-factories>
   <connection-factory name="InVmConnectionFactory">
      <connectors>
         <connector-ref connector-name="in-vm"/>
      </connectors>
      <entries>
         <entry name="java:jboss/DefaultJMSConnectionFactory"/>
         <entry name="java:/ConnectionFactory"/>
      </entries>
   </connection-factory>
   <connection-factory name="ServletConnectionFactory">
      <connectors>
         <connector-ref connector-name="servlet"/>
      </connectors>
      <entries>
         <entry name="java:jboss/exported/jms/ServletConnectionFactory"/>
      </entries>
   </connection-factory>
   <connection-factory name="RemoteConnectionFactory">
      <connectors>
         <connector-ref connector-name="netty"/>
      </connectors>
      <entries>
         <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>
      </entries>
     <confirmation-window-size>100</confirmation-window-size>
   </connection-factory>
   <pooled-connection-factory name="hornetq-ra">
      <transaction mode="xa"/>
      <connectors>
         <connector-ref connector-name="in-vm"/>
      </connectors>
      <entries>
         <entry name="java:/JmsXA"/>
      </entries>
   </pooled-connection-factory>
</jms-connection-factories>

The connection factory instances differentiate by their characteristics. I would rather explain in terms of transport layer and on application type to aid what instance to be used in a client application. 

Remote JMS clients :
I would recommend the factory bound at jms/RemoteConnectionFactory or jms/ServletConnectionFactory for remote JMS clients. The latter uses servlet transport over the configured http port where as the former uses netty on port 5445. The http bound connection factory resolves the need of opening a port on the firewall.

Clients inside the same container :
The clients who reside the in the same container should obtain connections from the JCA based connection factory; from the factory bound at java:/JmsXA. It is considered an anti-pattern to obtain connections other than the factory bound at java:/JmsXA . When you obtain connections from this factory, you're indeed obtaining connections from the ManagedConnectionPool (MCP).

When you obtain connections from the factory bound at java:/JmsXA, it's not only just a method call but the connection objects support two phase commit protocol. Hence, they can be enlisted in a global transaction. Further you're obtaining connections from JCA's MCP,  with the container managed security. 

However there's a limitation imposed by the JEE 7 specification, under the section 6:7. It does not permit EJB/Web applications to have more than a single active session from a connection obtained from the MCP. This restriction does not encourage JMS clients to use a JVM based connection factory inside the container, which is meant for legacy JMS applications. Instead, the users must make sure to reuse JMS objects efficiently since JMS objects are reusable.

The purpose of using a JMS resource adapter is compromised and nullified when JMS clients obtain connections from anywhere other than the managed connection pool; when they JMS clients are inside the same container. It is a bad practice use any connection factory than the JCA based factory.

The JMS ConnectionFactory and Connection objects can be concurrently accessed and reusable. The plain JMS clients must make sure to cache and reuse connection objects instead of obtaining a connection at each message send. Although session objects are reusable, I would not encourage them to be shared across threads, to assure thread safety. Further, it might have an adverse impact on transacted sessions. However, you don't have to cache JMS resources inside the container, the container does this for you when you obtain connections from the MCP.

Tuesday 1 October 2013

adding JMS support to the stock standalone.xml in wildfly 8

The stock standalone.xml configuration doesn't contain JMS support by default but standalone-full.xml. I thought of adding the messaging subsystem to the default configuration to make it easy. Here's how to do it :

The WildFly server would instantiate the JMS subsystem when you simply add the messaging subsystem with <hornetq-server />, as follows :

<subsystem xmlns="urn:jboss:domain:messaging:1.4">
   <hornetq-server />
</subsystem>

However, adding the above configuration alone would not suffice. You have to tell the server to load the messaging module, which contains necessary libraries to host the JMS subsystem. We have to define the messaging module under the list of "extentions".

<extensions>
    ....
    ....
    <extension module="org.jboss.as.messaging"/>
</extensions>

Although this configuration would accommodate JMS support in the server, it would still not support MDB deployment. The next step to define and configure an MDB container. 
   
<subsystem xmlns="urn:jboss:domain:ejb3:2.0">
   ....
   ....
   <mdb>
      <resource-adapter-ref resource-adapter-name="hornetq-ra"/>
      <bean-instance-pool-ref pool-name="mdb-strict-max-pool"/>
   </mdb>
   <pools>
      <bean-instance-pools>
         <strict-max-pool name="slsb-strict-max-pool" max-pool-size="20" instance-acquisition-timeout="5" instance-acquisition-timeout-unit="MINUTES"/>
         <strict-max-pool name="mdb-strict-max-pool" max-pool-size="20" instance-acquisition-timeout="5" instance-acquisition-timeout-unit="MINUTES"/>
      </bean-instance-pools>
   </pools>
</subsystem>

The bean instance pools have already been defined in the stock standlone.xml file but you need to configure the MDB container within <mdb />. 

installing wildfly 8 alpha 4

JEE 7 with WildFly 8

I thought of playing around JEE7 on the upcoming WildFly 8. At the writing of this, WF-8-Alpha4 was the latest available and there's no GA. I will be starting with basics of WF and as I move on, I believe it should be ready and comply with the full JEE 7 TCK. Since I do have a very short lived memory, I believe writing every little detail would help myself in the long run. I doubt I would explore all components of JEE 7 stack, instead I would probably focus on JCA 1.7, Transactions 1.2, JMS 2.0 and EJB 3.2.

Installation :
WildFly-8 Alpha 4 can be downloaded from the community web at the following location :
http://www.wildfly.org/download/ or directly from 
http://download.jboss.org/wildfly/8.0.0.Alpha4/wildfly-8.0.0.Alpha4.zip

The installation is pretty easy, just unzip the downloaded wildfly-8.0.0.Alpha4.zip to a preferred directory. The server would start the default standlone.xml configuration should you run ./standalone.sh from the $WF_HOME/bin directory. To list all available startup options, please try :
[bershath@shimmer bin]$ ./standalone.sh --help

To bind the application server to a specific port :
[bershath@shimmer bin]$ ./standalone.sh -b 192.168.1.50

The standalone.conf in the bin directory contains a few startup options, it's recommended to tweak these attributes to accommodate specific memory or user requirements.