Tuesday 13 March 2018

RabbitMQ: How to test a queue exists

When a RabbitMQ cluster exists, queue may be replicated only on some nodes and in case those nodes are down the queue won't work even if there are other nodes up and running.

By sending a message to  RabbitMQ we can see if Exchange node is up or not.
Using a Spring RabbitTemplate we can use:

convertAndSend(java.lang.Object object)
Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.

If RabbitMQ cluster is down, this method will throw an AmpqException.

We can think to send a message to a specific queue by using:

convertAndSend(java.lang.String routingKey, java.lang.Object object)
Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.

But if the queue name specified by routingKey does not exist, the method will not throw  AmpqException as in previous case, but will end silently.

To actually test if a queue exists (up & running) there is a way also used by Spring in RabbitAdminIntegrationTest:

/**
* Verify that a queue exists using the native Rabbit API to bypass all the connection and channel caching and callbacks in Spring AMQP.
* * @param queue The queue to verify * @return True if the queue exists */ private boolean queueExists(final Queue queue) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(BrokerTestUtils.getPort()); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); try { DeclareOk result = channel.queueDeclarePassive(queue.getName()); return result != null; } catch (IOException e) { return e.getCause().getMessage().contains("RESOURCE_LOCKED"); } finally { connection.close(); } }

To replicate a queue, whose name starts with ha, to all nodes in the cluster we can use following command: 

rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'
Starting from RabbitMQ 3.0 this is the only way, and programmatic definition of policies in xml files is ignored.

Friday 8 December 2017

Wildfly Configure Hornetq to receive bytes message from remote queue

First step is to install Hornetq and configure a queue in /config/stand-alone/non-clustered/hornetq-jms.xml :

   <queue name="testQueue">
       <entry name="queue/testQueue"/>
       <durable>false</durable>
   </queue>

In same file we can change the remote port specified in netty connector.

Keep all properties inside a JmsContext:

public class JmsContext {

    public static final String FILE_NAME_ATTRIBUTE = "FILE_NAME";
    public static final String FILE_CODE_ATTRIBUTE = "FILE_CODE";

    private String initialContextFactory;
    private String remoteConnectionfactoryUrl;
    private String importQueueUrl;

    private String serverUrl;
    private String userName;
    private String password";

    private String fileName;
    private String fileCode;
    private byte[] bytes;
    // getters and setters
}

Create a JmsHelper class to send the message:

import org.apache.log4j.Logger;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.File;
import java.util.Properties;

public class JmsHelper {

    private static final Logger LOG = Logger.getLogger(JmsHelper.class);

    public static void sendMessage(JmsContext jmsContext) {

        Connection connection = null;
        Session session = null;
        MessageProducer msgProducer = null;

        try {

            LOG.info("Running " + JmsHelper.class.toString() + 
                     " on current path " + new File(".").getAbsolutePath());
            LOG.info("Publishing " + jmsContext.getFileName() + 
                     " to destination '" + jmsContext.getImportQueueUrl() + "'\n");

            final Properties initialContextProperties = new Properties();
            initialContextProperties.put(Context.INITIAL_CONTEXT_FACTORY, 
                     jmsContext.getInitialContextFactory());
            initialContextProperties.put(Context.PROVIDER_URL, 
                     jmsContext.getServerUrl());
            initialContextProperties.put(Context.SECURITY_PRINCIPAL, 
                     jmsContext.getUserName());
            initialContextProperties.put(Context.SECURITY_CREDENTIALS, 
                     jmsContext.getPassword());

            final InitialContext ic = new InitialContext(initialContextProperties);
            LOG.info("Initial context created.");
            ConnectionFactory factory = (ConnectionFactory)ic.lookup(
                              jmsContext.getRemoteConnectionfactoryUrl());
            LOG.info("Connection " + jmsContext.getRemoteConnectionfactoryUrl() + 
                              "factory acquired.");

            final Queue queue = (Queue) ic.lookup(jmsContext.getImportQueueUrl());
            LOG.info("Found queue: " + jmsContext.getImportQueueUrl());

            connection = factory.createConnection(jmsContext.getUserName(), 
                             jmsContext.getPassword());
            session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
            msgProducer = session.createProducer(queue);

            BytesMessage msg = session.createBytesMessage();
            msg.setStringProperty(ImportJmsContext.FILE_NAME_ATTRIBUTE, 
                             jmsContext.getFileName());
            msg.setStringProperty(ImportJmsContext.FILE_CODE_ATTRIBUTE, 
                             jmsContext.getFileCode());

            msg.writeBytes(jmsContext.getBytes());

            msgProducer.send(msg);
            LOG.info("Bytes file message was published to server.");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Create the test class that connects directly to the Hornetq server (see server url and inital context factory class):

import javax.jms.JMSException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

public class JmsHornetqTest {

    private String fileName = "...";
    private String filePath = "...";
    private String fileCode = "E1";
    private String serverUrl = "jnp://localhost:1099";//"remote://localhost:5445";    
    private String initialContextFactory = "org.jnp.interfaces.NamingContextFactory";
    private String remoteConnectionFactoryUrl = "/ConnectionFactory";
    private String hornetqUser = "guest";
    private String hornetqPassword = "guest";
    private String hornetqImportQueueUrl = "/queue/testQueue";

    public void sendMessage() {

        try {

            ImportJmsContext jmsContext = new ImportJmsContext();
            jmsContext.setServerUrl(serverUrl);
            jmsContext.setUserName(hornetqUser);
            jmsContext.setPassword(hornetqPassword);
            jmsContext.setRemoteConnectionfactoryUrl(remoteConnectionFactoryUrl);
            jmsContext.setInitialContextFactory(initialContextFactory);
            jmsContext.setImportQueueUrl(hornetqImportQueueUrl);
            jmsContext.setFileName(fileName);
            jmsContext.setFileCode(fileCode);
            jmsContext.setBytes(getBytes());

            ImportJmsHelper.sendMessage(jmsContext);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private byte[] getBytes() throws IOException {
        Path fileLocation = Paths.get(filePath + File.separator + fileName);
        return Files.readAllBytes(fileLocation);
    }

    public static void main(String[] args) throws JMSException {
        new JmsHornetqTest().sendMessage();
    }
}

Now the only thing we need to do is to configure (jms-context.xml) the hornetq in our Wildfly server to listen and receive the bytes message (we do not specify the destination , but the destination name from hornetq server and a destination resolver):

<bean id="jmsHQImportConnectionFactory" lazy-init="true" 
            class="org.hornetq.jms.client.HornetQConnectionFactory">
    <constructor-arg value="true"/>
    <constructor-arg>
        <list>
            <bean class="org.hornetq.api.core.TransportConfiguration">
                <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory"/>
                <constructor-arg>
                    <map key-type="java.lang.String" value-type="java.lang.Object">                        
                        <entry key="host" value="localhost"/>
                        <entry key="port" value="5445"/>
                    </map>
                </constructor-arg>
            </bean>

            <!-- more beans if we have backup servers -->
        </list>
    </constructor-arg>

    <property name="reconnectAttempts" value="-1"/>
    <property name="failoverOnInitialConnection" value="true"/>
    <property name="initialConnectAttempts" value="15"/>
    <property name="consumerWindowSize" value="0"/>
</bean>
<bean id="jmsHQImportConnectionFactoryProxy" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
    <property name="targetConnectionFactory" ref="jmsHQImportConnectionFactory" />
    <property name="username" value="guest" />
    <property name="password" value="guest" />
</bean>
<bean id="jmsHQImportTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="jmsHQImportConnectionFactoryProxy"/>
</bean>
<bean id="jmsHQImportDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
    <property name="cache" value="true"/>
    <property name="fallbackToDynamicDestination" value="true"/>
</bean>
<bean id="jmsHQImportMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="concurrentConsumers" value="1"/>
    <property name="connectionFactory" ref="jmsHQImportConnectionFactoryProxy"/>
    <property name="transactionManager" ref="jmsHQImportTransactionManager"/>
    <property name="destinationName" value="testQueue"/>
    <property name="destinationResolver" ref="jmsHQImportDestinationResolver"/>
    <property name="messageListener" ref="jmsImportMessageListener"/>
</bean>

Wednesday 29 November 2017

Wildfly Remote JMS

I wanted to send/import a file using JMS to a Wildfly server application. To be able to do this I needed to do the following:

1. Add user "test" and password "test@1975" as an Application User to wildfly server using add-user script from bin. We must select yes for user used for one AS process to another.

2. Configure standalone-full.xml

Here it is defined the messaging-activemq subsystem:

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
    <server name="default">
        <security-setting name="#">
            <role name="guest" delete-non-durable-queue="true" 
                create-non-durable-queue="true" 
                consume="true" send="true"/>
        </security-setting>
        <address-setting name="#" message-counter-history-day-limit="10" 
                page-size-bytes="2097152" max-size-bytes="10485760" 
                expiry-address="jms.queue.ExpiryQueue" 
                dead-letter-address="jms.queue.DLQ"/>
        <http-connector name="http-connector" endpoint="http-acceptor" 
                socket-binding="http"/>
        <http-connector name="http-connector-throughput" 
                endpoint="http-acceptor-throughput" socket-binding="http">
            <param name="batch-delay" value="50"/>
        </http-connector>
        <in-vm-connector name="in-vm" server-id="0"/>
        <http-acceptor name="http-acceptor" http-listener="default"/>
        <http-acceptor name="http-acceptor-throughput" http-listener="default">
            <param name="batch-delay" value="50"/>
            <param name="direct-deliver" value="false"/>
        </http-acceptor>
        <in-vm-acceptor name="in-vm" server-id="0"/>
        <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
        <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>        
        <jms-queue name="testImport" entries="java:jboss/exported/jms/queue/testImport"/>
        <connection-factory name="InVmConnectionFactory" 
                entries="java:/ConnectionFactory" connectors="in-vm"/>
        <connection-factory name="RemoteConnectionFactory" 
                entries="java:jboss/exported/jms/RemoteConnectionFactory" 
                connectors="http-connector"/>
        <pooled-connection-factory name="activemq-ra" transaction="xa" 
                entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" 
                connectors="in-vm"/>
    </server>
</subsystem>
The most important things here are the definition of testImport queue and the definition of RemoteConnectionFactory.

Because we use a remote  JMS we must have remoting-connector defined in remoting subsystem:

<subsystem xmlns="urn:jboss:domain:remoting:3.0">
    <endpoint/>
    <http-connector name="http-remoting-connector" connector-ref="default" 
                security-realm="ApplicationRealm"/>
    <connector name="remoting-connector" socket-binding="remoting" 
                security-realm="ApplicationRealm"/>
</subsystem>

And this socket binding must be also found on socket-binding-group where the remote port is specified , 4447:

<socket-binding-group name="standard-sockets" default-interface="public" 
                port-offset="${jboss.socket.binding.port-offset:0}">
    <socket-binding name="management-http" interface="management" 
                port="${jboss.management.http.port:9990}"/>
    <socket-binding name="management-https" interface="management" 
                port="${jboss.management.https.port:9993}"/>
    <socket-binding name="ajp" port="${jboss.ajp.port:8009}"/>
    <socket-binding name="http" port="${jboss.http.port:8080}"/>
    <socket-binding name="https" port="${jboss.https.port:8443}"/>
    <socket-binding name="remoting" interface="public" port="4447"/>
    <socket-binding name="iiop" interface="unsecure" port="3528"/>
    <socket-binding name="iiop-ssl" interface="unsecure" port="3529"/>
    <socket-binding name="txn-recovery-environment" port="4712"/>
    <socket-binding name="txn-status-manager" port="4713"/>
    <outbound-socket-binding name="mail-smtp">
        <remote-destination host="localhost" port="25"/>
    </outbound-socket-binding>
</socket-binding-group>

3. Create jms context xml file (jms-import-context.xml). This is in general added in web.xml context classpath:

<?xml version="1.0" encoding="UTF-8"?>
  <beans xmlns="http://www.springframework.org/schema/beans"       
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       
         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">

    <bean name="jmsImportConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiName" value="java:jboss/exported/jms/RemoteConnectionFactory"/>
        <property name="expectedType" value="javax.jms.ConnectionFactory"/>
    </bean>

    <bean id="jmsImportConnectionFactoryProxy" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory" ref="jmsImportConnectionFactory" />
        <property name="username" value="test" />
        <property name="password" value="test@1975" />
    </bean>

    <bean id="jmsImportTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsImportConnectionFactoryProxy"/>
    </bean>

    <bean name="jmsImportDestination" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiName" value="java:jboss/exported/jms/queue/testImport"/>
    </bean>

    <bean id="jmsImportMessageListener" class="eterra.xinterface.jms.ImportJmsMessageListener">
        <property name="dispatcher" ref="dataImportProcessDispatcher"/>
        <property name="userDao" ref="userDao"/>
    </bean>

    <bean id="jmsImportMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="concurrentConsumers" value="1"/>
        <property name="connectionFactory" ref="jmsImportConnectionFactoryProxy"/>
        <property name="transactionManager" ref="jmsImportTransactionManager"/>
        <property name="destination" ref="jmsImportDestination"/>
        <property name="messageListener" ref="jmsImportMessageListener"/>
    </bean>

</beans>

Here it can be seen the usage of remote connection factory url:  java:jboss/exported/jms/RemoteConnectionFactory , the usage of Wildfly user/credentials, the name of the remote destination : java:jboss/exported/jms/queue/testImport. The class ImportJmsMessageListener will accept JMS BytesMessage request.

4. Configure jboss-service.xml to contain testImport queue

<?xml version="1.0" encoding="UTF-8"?><server>   
    <mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.mq.destination:service=Queue,name=testImport">
        <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
    </mbean>
</server>

5. Create ImportJmsMessageListener class. Besides file content we can send in the message some useful properties like FILE_NAME and FILE_TYPE:

public class ImportJmsMessageListener implements MessageListener, InitializingBean {

 @Override public void onMessage(Message message) {

   LOG.info("Read message file from JMS ...");
   byte[] bytes = readBytesFromJMSMessage(message);
   LOG.info("Message bytes read: " + bytes.length);
   // do what you need 
 }


 /**  * Reads the body of a JMS  {@link Message} as a byte[].
  * @param msg JMS {@link Message}
  * @return the byte[] read.
  * @throws JMSException if an error occurs while trying to read the body content.
  */
 public byte[] readBytesFromJMSMessage(final Message msg) throws JMSException {
    if (!(msg instanceof BytesMessage)) {
        throw new JMSException("Message is not a BytesMessage!");
    }
    BytesMessage bsMsg=(BytesMessage)msg;

    fileName = bsMsg.getStringProperty(FILE_NAME);
    if (fileName == null) {
        throw new JMSException(FILE_NAME + " property not specified!");
    }

    fileType = bsMsg.getStringProperty(FILE_TYPE);
    if (fileType == null) {
        throw new JMSException(FILE_TYPE + " property not specified!");
    }

    if (bsMsg.getBodyLength() >= Integer.MAX_VALUE) {
        throw new JMSException("Message body is too large[" + bsMsg.getBodyLength() + "].");
    }
    byte[] ba=new byte[(int)bsMsg.getBodyLength()];
    bsMsg.readBytes(ba);
    return ba;
 }
}

6. Create a test class. The most important thing here is how the remote connection factory url and queue url are used, without jboss/exported prefix used to define them in xml server configuration files. To run test class as a standalone you have to add in classpath jboss-client.jar from wildfly/bin/client folder:

public class JmsImportTest {

    private String FILE_NAME = "myfile.xml";
    private String FILE_PATH = "C:\\Projects";
    private String FILE_TYPE = "MY_TYPE";

    private String initialContextFactory = "org.jboss.naming.remote.client.InitialContextFactory";
    private String serverUrl = "remote://localhost:4447";
    private String userName = "test";
    private String password = "test@1975";
    private String remoteConnectionfactoryUrl = "jms/RemoteConnectionFactory";
    private static String importQueueUrl = "jms/queue/testImport";

    public void sendMessage(String queueName) {

        Connection connection = null;
        Session session = null;
        MessageProducer msgProducer = null;

        try {

            System.out.println("Running " + JmsImportTest.class.toString() + " on current path " + new File(".").getAbsolutePath());
            System.out.println("Publishing " + FILE_NAME + " to destination '" + queueName + "'\n");

            final Properties initialContextProperties = new Properties();
            initialContextProperties.put(Context.INITIAL_CONTEXT_FACTORY, initialContextFactory);
            initialContextProperties.put(Context.PROVIDER_URL, serverUrl);
            initialContextProperties.put(Context.SECURITY_PRINCIPAL, userName);
            initialContextProperties.put(Context.SECURITY_CREDENTIALS, password);

            final InitialContext ic = new InitialContext(initialContextProperties);
            ConnectionFactory factory = (ConnectionFactory)ic.lookup(remoteConnectionfactoryUrl);

            final Queue queue = (Queue) ic.lookup(queueName);

            connection = factory.createConnection(userName, password);
            session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
            msgProducer = session.createProducer(queue);

            BytesMessage msg = session.createBytesMessage();
            msg.setStringProperty("FILE_NAME", FILE_NAME);
            msg.setStringProperty("FILE_TYPE", FILE_TYPE);

            Path fileLocation = Paths.get(FILE_PATH + File.separator + FILE_NAME);
            byte[] data = Files.readAllBytes(fileLocation);
            msg.writeBytes(data);

            msgProducer.send(msg);
            System.out.println("Bytes file message was published to server.");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws JMSException {
        new JmsImportTest().sendMessage(importQueueUrl);
    }
}