Wednesday 29 November 2017

Wildfly Remote JMS

I wanted to send/import a file using JMS to a Wildfly server application. To be able to do this I needed to do the following:

1. Add user "test" and password "test@1975" as an Application User to wildfly server using add-user script from bin. We must select yes for user used for one AS process to another.

2. Configure standalone-full.xml

Here it is defined the messaging-activemq subsystem:

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
    <server name="default">
        <security-setting name="#">
            <role name="guest" delete-non-durable-queue="true" 
                create-non-durable-queue="true" 
                consume="true" send="true"/>
        </security-setting>
        <address-setting name="#" message-counter-history-day-limit="10" 
                page-size-bytes="2097152" max-size-bytes="10485760" 
                expiry-address="jms.queue.ExpiryQueue" 
                dead-letter-address="jms.queue.DLQ"/>
        <http-connector name="http-connector" endpoint="http-acceptor" 
                socket-binding="http"/>
        <http-connector name="http-connector-throughput" 
                endpoint="http-acceptor-throughput" socket-binding="http">
            <param name="batch-delay" value="50"/>
        </http-connector>
        <in-vm-connector name="in-vm" server-id="0"/>
        <http-acceptor name="http-acceptor" http-listener="default"/>
        <http-acceptor name="http-acceptor-throughput" http-listener="default">
            <param name="batch-delay" value="50"/>
            <param name="direct-deliver" value="false"/>
        </http-acceptor>
        <in-vm-acceptor name="in-vm" server-id="0"/>
        <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
        <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>        
        <jms-queue name="testImport" entries="java:jboss/exported/jms/queue/testImport"/>
        <connection-factory name="InVmConnectionFactory" 
                entries="java:/ConnectionFactory" connectors="in-vm"/>
        <connection-factory name="RemoteConnectionFactory" 
                entries="java:jboss/exported/jms/RemoteConnectionFactory" 
                connectors="http-connector"/>
        <pooled-connection-factory name="activemq-ra" transaction="xa" 
                entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" 
                connectors="in-vm"/>
    </server>
</subsystem>
The most important things here are the definition of testImport queue and the definition of RemoteConnectionFactory.

Because we use a remote  JMS we must have remoting-connector defined in remoting subsystem:

<subsystem xmlns="urn:jboss:domain:remoting:3.0">
    <endpoint/>
    <http-connector name="http-remoting-connector" connector-ref="default" 
                security-realm="ApplicationRealm"/>
    <connector name="remoting-connector" socket-binding="remoting" 
                security-realm="ApplicationRealm"/>
</subsystem>

And this socket binding must be also found on socket-binding-group where the remote port is specified , 4447:

<socket-binding-group name="standard-sockets" default-interface="public" 
                port-offset="${jboss.socket.binding.port-offset:0}">
    <socket-binding name="management-http" interface="management" 
                port="${jboss.management.http.port:9990}"/>
    <socket-binding name="management-https" interface="management" 
                port="${jboss.management.https.port:9993}"/>
    <socket-binding name="ajp" port="${jboss.ajp.port:8009}"/>
    <socket-binding name="http" port="${jboss.http.port:8080}"/>
    <socket-binding name="https" port="${jboss.https.port:8443}"/>
    <socket-binding name="remoting" interface="public" port="4447"/>
    <socket-binding name="iiop" interface="unsecure" port="3528"/>
    <socket-binding name="iiop-ssl" interface="unsecure" port="3529"/>
    <socket-binding name="txn-recovery-environment" port="4712"/>
    <socket-binding name="txn-status-manager" port="4713"/>
    <outbound-socket-binding name="mail-smtp">
        <remote-destination host="localhost" port="25"/>
    </outbound-socket-binding>
</socket-binding-group>

3. Create jms context xml file (jms-import-context.xml). This is in general added in web.xml context classpath:

<?xml version="1.0" encoding="UTF-8"?>
  <beans xmlns="http://www.springframework.org/schema/beans"       
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       
         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">

    <bean name="jmsImportConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiName" value="java:jboss/exported/jms/RemoteConnectionFactory"/>
        <property name="expectedType" value="javax.jms.ConnectionFactory"/>
    </bean>

    <bean id="jmsImportConnectionFactoryProxy" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory" ref="jmsImportConnectionFactory" />
        <property name="username" value="test" />
        <property name="password" value="test@1975" />
    </bean>

    <bean id="jmsImportTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsImportConnectionFactoryProxy"/>
    </bean>

    <bean name="jmsImportDestination" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiName" value="java:jboss/exported/jms/queue/testImport"/>
    </bean>

    <bean id="jmsImportMessageListener" class="eterra.xinterface.jms.ImportJmsMessageListener">
        <property name="dispatcher" ref="dataImportProcessDispatcher"/>
        <property name="userDao" ref="userDao"/>
    </bean>

    <bean id="jmsImportMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="concurrentConsumers" value="1"/>
        <property name="connectionFactory" ref="jmsImportConnectionFactoryProxy"/>
        <property name="transactionManager" ref="jmsImportTransactionManager"/>
        <property name="destination" ref="jmsImportDestination"/>
        <property name="messageListener" ref="jmsImportMessageListener"/>
    </bean>

</beans>

Here it can be seen the usage of remote connection factory url:  java:jboss/exported/jms/RemoteConnectionFactory , the usage of Wildfly user/credentials, the name of the remote destination : java:jboss/exported/jms/queue/testImport. The class ImportJmsMessageListener will accept JMS BytesMessage request.

4. Configure jboss-service.xml to contain testImport queue

<?xml version="1.0" encoding="UTF-8"?><server>   
    <mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.mq.destination:service=Queue,name=testImport">
        <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
    </mbean>
</server>

5. Create ImportJmsMessageListener class. Besides file content we can send in the message some useful properties like FILE_NAME and FILE_TYPE:

public class ImportJmsMessageListener implements MessageListener, InitializingBean {

 @Override public void onMessage(Message message) {

   LOG.info("Read message file from JMS ...");
   byte[] bytes = readBytesFromJMSMessage(message);
   LOG.info("Message bytes read: " + bytes.length);
   // do what you need 
 }


 /**  * Reads the body of a JMS  {@link Message} as a byte[].
  * @param msg JMS {@link Message}
  * @return the byte[] read.
  * @throws JMSException if an error occurs while trying to read the body content.
  */
 public byte[] readBytesFromJMSMessage(final Message msg) throws JMSException {
    if (!(msg instanceof BytesMessage)) {
        throw new JMSException("Message is not a BytesMessage!");
    }
    BytesMessage bsMsg=(BytesMessage)msg;

    fileName = bsMsg.getStringProperty(FILE_NAME);
    if (fileName == null) {
        throw new JMSException(FILE_NAME + " property not specified!");
    }

    fileType = bsMsg.getStringProperty(FILE_TYPE);
    if (fileType == null) {
        throw new JMSException(FILE_TYPE + " property not specified!");
    }

    if (bsMsg.getBodyLength() >= Integer.MAX_VALUE) {
        throw new JMSException("Message body is too large[" + bsMsg.getBodyLength() + "].");
    }
    byte[] ba=new byte[(int)bsMsg.getBodyLength()];
    bsMsg.readBytes(ba);
    return ba;
 }
}

6. Create a test class. The most important thing here is how the remote connection factory url and queue url are used, without jboss/exported prefix used to define them in xml server configuration files. To run test class as a standalone you have to add in classpath jboss-client.jar from wildfly/bin/client folder:

public class JmsImportTest {

    private String FILE_NAME = "myfile.xml";
    private String FILE_PATH = "C:\\Projects";
    private String FILE_TYPE = "MY_TYPE";

    private String initialContextFactory = "org.jboss.naming.remote.client.InitialContextFactory";
    private String serverUrl = "remote://localhost:4447";
    private String userName = "test";
    private String password = "test@1975";
    private String remoteConnectionfactoryUrl = "jms/RemoteConnectionFactory";
    private static String importQueueUrl = "jms/queue/testImport";

    public void sendMessage(String queueName) {

        Connection connection = null;
        Session session = null;
        MessageProducer msgProducer = null;

        try {

            System.out.println("Running " + JmsImportTest.class.toString() + " on current path " + new File(".").getAbsolutePath());
            System.out.println("Publishing " + FILE_NAME + " to destination '" + queueName + "'\n");

            final Properties initialContextProperties = new Properties();
            initialContextProperties.put(Context.INITIAL_CONTEXT_FACTORY, initialContextFactory);
            initialContextProperties.put(Context.PROVIDER_URL, serverUrl);
            initialContextProperties.put(Context.SECURITY_PRINCIPAL, userName);
            initialContextProperties.put(Context.SECURITY_CREDENTIALS, password);

            final InitialContext ic = new InitialContext(initialContextProperties);
            ConnectionFactory factory = (ConnectionFactory)ic.lookup(remoteConnectionfactoryUrl);

            final Queue queue = (Queue) ic.lookup(queueName);

            connection = factory.createConnection(userName, password);
            session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
            msgProducer = session.createProducer(queue);

            BytesMessage msg = session.createBytesMessage();
            msg.setStringProperty("FILE_NAME", FILE_NAME);
            msg.setStringProperty("FILE_TYPE", FILE_TYPE);

            Path fileLocation = Paths.get(FILE_PATH + File.separator + FILE_NAME);
            byte[] data = Files.readAllBytes(fileLocation);
            msg.writeBytes(data);

            msgProducer.send(msg);
            System.out.println("Bytes file message was published to server.");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws JMSException {
        new JmsImportTest().sendMessage(importQueueUrl);
    }
}



No comments:

Post a Comment