Friday 8 December 2017

Wildfly Configure Hornetq to receive bytes message from remote queue

First step is to install Hornetq and configure a queue in /config/stand-alone/non-clustered/hornetq-jms.xml :

   <queue name="testQueue">
       <entry name="queue/testQueue"/>
       <durable>false</durable>
   </queue>

In same file we can change the remote port specified in netty connector.

Keep all properties inside a JmsContext:

public class JmsContext {

    public static final String FILE_NAME_ATTRIBUTE = "FILE_NAME";
    public static final String FILE_CODE_ATTRIBUTE = "FILE_CODE";

    private String initialContextFactory;
    private String remoteConnectionfactoryUrl;
    private String importQueueUrl;

    private String serverUrl;
    private String userName;
    private String password";

    private String fileName;
    private String fileCode;
    private byte[] bytes;
    // getters and setters
}

Create a JmsHelper class to send the message:

import org.apache.log4j.Logger;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.File;
import java.util.Properties;

public class JmsHelper {

    private static final Logger LOG = Logger.getLogger(JmsHelper.class);

    public static void sendMessage(JmsContext jmsContext) {

        Connection connection = null;
        Session session = null;
        MessageProducer msgProducer = null;

        try {

            LOG.info("Running " + JmsHelper.class.toString() + 
                     " on current path " + new File(".").getAbsolutePath());
            LOG.info("Publishing " + jmsContext.getFileName() + 
                     " to destination '" + jmsContext.getImportQueueUrl() + "'\n");

            final Properties initialContextProperties = new Properties();
            initialContextProperties.put(Context.INITIAL_CONTEXT_FACTORY, 
                     jmsContext.getInitialContextFactory());
            initialContextProperties.put(Context.PROVIDER_URL, 
                     jmsContext.getServerUrl());
            initialContextProperties.put(Context.SECURITY_PRINCIPAL, 
                     jmsContext.getUserName());
            initialContextProperties.put(Context.SECURITY_CREDENTIALS, 
                     jmsContext.getPassword());

            final InitialContext ic = new InitialContext(initialContextProperties);
            LOG.info("Initial context created.");
            ConnectionFactory factory = (ConnectionFactory)ic.lookup(
                              jmsContext.getRemoteConnectionfactoryUrl());
            LOG.info("Connection " + jmsContext.getRemoteConnectionfactoryUrl() + 
                              "factory acquired.");

            final Queue queue = (Queue) ic.lookup(jmsContext.getImportQueueUrl());
            LOG.info("Found queue: " + jmsContext.getImportQueueUrl());

            connection = factory.createConnection(jmsContext.getUserName(), 
                             jmsContext.getPassword());
            session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
            msgProducer = session.createProducer(queue);

            BytesMessage msg = session.createBytesMessage();
            msg.setStringProperty(ImportJmsContext.FILE_NAME_ATTRIBUTE, 
                             jmsContext.getFileName());
            msg.setStringProperty(ImportJmsContext.FILE_CODE_ATTRIBUTE, 
                             jmsContext.getFileCode());

            msg.writeBytes(jmsContext.getBytes());

            msgProducer.send(msg);
            LOG.info("Bytes file message was published to server.");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Create the test class that connects directly to the Hornetq server (see server url and inital context factory class):

import javax.jms.JMSException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

public class JmsHornetqTest {

    private String fileName = "...";
    private String filePath = "...";
    private String fileCode = "E1";
    private String serverUrl = "jnp://localhost:1099";//"remote://localhost:5445";    
    private String initialContextFactory = "org.jnp.interfaces.NamingContextFactory";
    private String remoteConnectionFactoryUrl = "/ConnectionFactory";
    private String hornetqUser = "guest";
    private String hornetqPassword = "guest";
    private String hornetqImportQueueUrl = "/queue/testQueue";

    public void sendMessage() {

        try {

            ImportJmsContext jmsContext = new ImportJmsContext();
            jmsContext.setServerUrl(serverUrl);
            jmsContext.setUserName(hornetqUser);
            jmsContext.setPassword(hornetqPassword);
            jmsContext.setRemoteConnectionfactoryUrl(remoteConnectionFactoryUrl);
            jmsContext.setInitialContextFactory(initialContextFactory);
            jmsContext.setImportQueueUrl(hornetqImportQueueUrl);
            jmsContext.setFileName(fileName);
            jmsContext.setFileCode(fileCode);
            jmsContext.setBytes(getBytes());

            ImportJmsHelper.sendMessage(jmsContext);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private byte[] getBytes() throws IOException {
        Path fileLocation = Paths.get(filePath + File.separator + fileName);
        return Files.readAllBytes(fileLocation);
    }

    public static void main(String[] args) throws JMSException {
        new JmsHornetqTest().sendMessage();
    }
}

Now the only thing we need to do is to configure (jms-context.xml) the hornetq in our Wildfly server to listen and receive the bytes message (we do not specify the destination , but the destination name from hornetq server and a destination resolver):

<bean id="jmsHQImportConnectionFactory" lazy-init="true" 
            class="org.hornetq.jms.client.HornetQConnectionFactory">
    <constructor-arg value="true"/>
    <constructor-arg>
        <list>
            <bean class="org.hornetq.api.core.TransportConfiguration">
                <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory"/>
                <constructor-arg>
                    <map key-type="java.lang.String" value-type="java.lang.Object">                        
                        <entry key="host" value="localhost"/>
                        <entry key="port" value="5445"/>
                    </map>
                </constructor-arg>
            </bean>

            <!-- more beans if we have backup servers -->
        </list>
    </constructor-arg>

    <property name="reconnectAttempts" value="-1"/>
    <property name="failoverOnInitialConnection" value="true"/>
    <property name="initialConnectAttempts" value="15"/>
    <property name="consumerWindowSize" value="0"/>
</bean>
<bean id="jmsHQImportConnectionFactoryProxy" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
    <property name="targetConnectionFactory" ref="jmsHQImportConnectionFactory" />
    <property name="username" value="guest" />
    <property name="password" value="guest" />
</bean>
<bean id="jmsHQImportTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="jmsHQImportConnectionFactoryProxy"/>
</bean>
<bean id="jmsHQImportDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
    <property name="cache" value="true"/>
    <property name="fallbackToDynamicDestination" value="true"/>
</bean>
<bean id="jmsHQImportMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="concurrentConsumers" value="1"/>
    <property name="connectionFactory" ref="jmsHQImportConnectionFactoryProxy"/>
    <property name="transactionManager" ref="jmsHQImportTransactionManager"/>
    <property name="destinationName" value="testQueue"/>
    <property name="destinationResolver" ref="jmsHQImportDestinationResolver"/>
    <property name="messageListener" ref="jmsImportMessageListener"/>
</bean>

Wednesday 29 November 2017

Wildfly Remote JMS

I wanted to send/import a file using JMS to a Wildfly server application. To be able to do this I needed to do the following:

1. Add user "test" and password "test@1975" as an Application User to wildfly server using add-user script from bin. We must select yes for user used for one AS process to another.

2. Configure standalone-full.xml

Here it is defined the messaging-activemq subsystem:

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
    <server name="default">
        <security-setting name="#">
            <role name="guest" delete-non-durable-queue="true" 
                create-non-durable-queue="true" 
                consume="true" send="true"/>
        </security-setting>
        <address-setting name="#" message-counter-history-day-limit="10" 
                page-size-bytes="2097152" max-size-bytes="10485760" 
                expiry-address="jms.queue.ExpiryQueue" 
                dead-letter-address="jms.queue.DLQ"/>
        <http-connector name="http-connector" endpoint="http-acceptor" 
                socket-binding="http"/>
        <http-connector name="http-connector-throughput" 
                endpoint="http-acceptor-throughput" socket-binding="http">
            <param name="batch-delay" value="50"/>
        </http-connector>
        <in-vm-connector name="in-vm" server-id="0"/>
        <http-acceptor name="http-acceptor" http-listener="default"/>
        <http-acceptor name="http-acceptor-throughput" http-listener="default">
            <param name="batch-delay" value="50"/>
            <param name="direct-deliver" value="false"/>
        </http-acceptor>
        <in-vm-acceptor name="in-vm" server-id="0"/>
        <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
        <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>        
        <jms-queue name="testImport" entries="java:jboss/exported/jms/queue/testImport"/>
        <connection-factory name="InVmConnectionFactory" 
                entries="java:/ConnectionFactory" connectors="in-vm"/>
        <connection-factory name="RemoteConnectionFactory" 
                entries="java:jboss/exported/jms/RemoteConnectionFactory" 
                connectors="http-connector"/>
        <pooled-connection-factory name="activemq-ra" transaction="xa" 
                entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" 
                connectors="in-vm"/>
    </server>
</subsystem>
The most important things here are the definition of testImport queue and the definition of RemoteConnectionFactory.

Because we use a remote  JMS we must have remoting-connector defined in remoting subsystem:

<subsystem xmlns="urn:jboss:domain:remoting:3.0">
    <endpoint/>
    <http-connector name="http-remoting-connector" connector-ref="default" 
                security-realm="ApplicationRealm"/>
    <connector name="remoting-connector" socket-binding="remoting" 
                security-realm="ApplicationRealm"/>
</subsystem>

And this socket binding must be also found on socket-binding-group where the remote port is specified , 4447:

<socket-binding-group name="standard-sockets" default-interface="public" 
                port-offset="${jboss.socket.binding.port-offset:0}">
    <socket-binding name="management-http" interface="management" 
                port="${jboss.management.http.port:9990}"/>
    <socket-binding name="management-https" interface="management" 
                port="${jboss.management.https.port:9993}"/>
    <socket-binding name="ajp" port="${jboss.ajp.port:8009}"/>
    <socket-binding name="http" port="${jboss.http.port:8080}"/>
    <socket-binding name="https" port="${jboss.https.port:8443}"/>
    <socket-binding name="remoting" interface="public" port="4447"/>
    <socket-binding name="iiop" interface="unsecure" port="3528"/>
    <socket-binding name="iiop-ssl" interface="unsecure" port="3529"/>
    <socket-binding name="txn-recovery-environment" port="4712"/>
    <socket-binding name="txn-status-manager" port="4713"/>
    <outbound-socket-binding name="mail-smtp">
        <remote-destination host="localhost" port="25"/>
    </outbound-socket-binding>
</socket-binding-group>

3. Create jms context xml file (jms-import-context.xml). This is in general added in web.xml context classpath:

<?xml version="1.0" encoding="UTF-8"?>
  <beans xmlns="http://www.springframework.org/schema/beans"       
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       
         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">

    <bean name="jmsImportConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiName" value="java:jboss/exported/jms/RemoteConnectionFactory"/>
        <property name="expectedType" value="javax.jms.ConnectionFactory"/>
    </bean>

    <bean id="jmsImportConnectionFactoryProxy" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory" ref="jmsImportConnectionFactory" />
        <property name="username" value="test" />
        <property name="password" value="test@1975" />
    </bean>

    <bean id="jmsImportTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsImportConnectionFactoryProxy"/>
    </bean>

    <bean name="jmsImportDestination" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiName" value="java:jboss/exported/jms/queue/testImport"/>
    </bean>

    <bean id="jmsImportMessageListener" class="eterra.xinterface.jms.ImportJmsMessageListener">
        <property name="dispatcher" ref="dataImportProcessDispatcher"/>
        <property name="userDao" ref="userDao"/>
    </bean>

    <bean id="jmsImportMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="concurrentConsumers" value="1"/>
        <property name="connectionFactory" ref="jmsImportConnectionFactoryProxy"/>
        <property name="transactionManager" ref="jmsImportTransactionManager"/>
        <property name="destination" ref="jmsImportDestination"/>
        <property name="messageListener" ref="jmsImportMessageListener"/>
    </bean>

</beans>

Here it can be seen the usage of remote connection factory url:  java:jboss/exported/jms/RemoteConnectionFactory , the usage of Wildfly user/credentials, the name of the remote destination : java:jboss/exported/jms/queue/testImport. The class ImportJmsMessageListener will accept JMS BytesMessage request.

4. Configure jboss-service.xml to contain testImport queue

<?xml version="1.0" encoding="UTF-8"?><server>   
    <mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.mq.destination:service=Queue,name=testImport">
        <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
    </mbean>
</server>

5. Create ImportJmsMessageListener class. Besides file content we can send in the message some useful properties like FILE_NAME and FILE_TYPE:

public class ImportJmsMessageListener implements MessageListener, InitializingBean {

 @Override public void onMessage(Message message) {

   LOG.info("Read message file from JMS ...");
   byte[] bytes = readBytesFromJMSMessage(message);
   LOG.info("Message bytes read: " + bytes.length);
   // do what you need 
 }


 /**  * Reads the body of a JMS  {@link Message} as a byte[].
  * @param msg JMS {@link Message}
  * @return the byte[] read.
  * @throws JMSException if an error occurs while trying to read the body content.
  */
 public byte[] readBytesFromJMSMessage(final Message msg) throws JMSException {
    if (!(msg instanceof BytesMessage)) {
        throw new JMSException("Message is not a BytesMessage!");
    }
    BytesMessage bsMsg=(BytesMessage)msg;

    fileName = bsMsg.getStringProperty(FILE_NAME);
    if (fileName == null) {
        throw new JMSException(FILE_NAME + " property not specified!");
    }

    fileType = bsMsg.getStringProperty(FILE_TYPE);
    if (fileType == null) {
        throw new JMSException(FILE_TYPE + " property not specified!");
    }

    if (bsMsg.getBodyLength() >= Integer.MAX_VALUE) {
        throw new JMSException("Message body is too large[" + bsMsg.getBodyLength() + "].");
    }
    byte[] ba=new byte[(int)bsMsg.getBodyLength()];
    bsMsg.readBytes(ba);
    return ba;
 }
}

6. Create a test class. The most important thing here is how the remote connection factory url and queue url are used, without jboss/exported prefix used to define them in xml server configuration files. To run test class as a standalone you have to add in classpath jboss-client.jar from wildfly/bin/client folder:

public class JmsImportTest {

    private String FILE_NAME = "myfile.xml";
    private String FILE_PATH = "C:\\Projects";
    private String FILE_TYPE = "MY_TYPE";

    private String initialContextFactory = "org.jboss.naming.remote.client.InitialContextFactory";
    private String serverUrl = "remote://localhost:4447";
    private String userName = "test";
    private String password = "test@1975";
    private String remoteConnectionfactoryUrl = "jms/RemoteConnectionFactory";
    private static String importQueueUrl = "jms/queue/testImport";

    public void sendMessage(String queueName) {

        Connection connection = null;
        Session session = null;
        MessageProducer msgProducer = null;

        try {

            System.out.println("Running " + JmsImportTest.class.toString() + " on current path " + new File(".").getAbsolutePath());
            System.out.println("Publishing " + FILE_NAME + " to destination '" + queueName + "'\n");

            final Properties initialContextProperties = new Properties();
            initialContextProperties.put(Context.INITIAL_CONTEXT_FACTORY, initialContextFactory);
            initialContextProperties.put(Context.PROVIDER_URL, serverUrl);
            initialContextProperties.put(Context.SECURITY_PRINCIPAL, userName);
            initialContextProperties.put(Context.SECURITY_CREDENTIALS, password);

            final InitialContext ic = new InitialContext(initialContextProperties);
            ConnectionFactory factory = (ConnectionFactory)ic.lookup(remoteConnectionfactoryUrl);

            final Queue queue = (Queue) ic.lookup(queueName);

            connection = factory.createConnection(userName, password);
            session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
            msgProducer = session.createProducer(queue);

            BytesMessage msg = session.createBytesMessage();
            msg.setStringProperty("FILE_NAME", FILE_NAME);
            msg.setStringProperty("FILE_TYPE", FILE_TYPE);

            Path fileLocation = Paths.get(FILE_PATH + File.separator + FILE_NAME);
            byte[] data = Files.readAllBytes(fileLocation);
            msg.writeBytes(data);

            msgProducer.send(msg);
            System.out.println("Bytes file message was published to server.");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws JMSException {
        new JmsImportTest().sendMessage(importQueueUrl);
    }
}



Thursday 16 November 2017

IDE Golden Rule

Do you  have a very large project composed of many projects / modules and stuff? You think it's easy to create the project in IntelliJ or other IDE? Think twice.

Practically you can create the project from existing sources if you already took them from your VCS or directly from VCS. After you add first Project / Module by selecting the pom.xml file you have to do some more things:

- be sure you use the correct maven settings xml file (in File/Settings/Maven configuration)
- be sure to select the needed profiles

Then you move on adding other Modules from existing sources. (There is no option in Intellij to add modules from VCS). You should also select the pom.xml file and not the directory to avoid the hassle.

This is just a path to follow. But what I was recently seen was a lot of errors in IntelliJ for not resolving classes even if they were there. Tried File/Invalidate caches & restart but with no success.
I started to tackle with all the modules and seems there were problems with dependencies.

Finally I found the problem. A lot of modules/projects were build with a custom maven script, so a lot of libraries, generated sources and stuff were polluting the actual local repository. When I created the project, IntelliJ found all of these and added them to dependencies and paths, generating internal conflicts.

So be aware. And have in mind an IDE Golden Rule:

"When you create a project from existing sources, be sure that what you have there is just a clean sources folder without anything else inside".


Friday 28 July 2017

Java 8: Use Lambda to create a Pivot-like result

Lets assume we have a java POJO Person with some fields like year, teamId and sum and we want to create a pivot which shows for every year and teamId the total sum like:
-----------------------------------------------
             |     Test|  Develop|  Deploy|
-----------------------------------------------
     2015|    1450|       5800|       870|
-----------------------------------------------
     2016|    2500|     10000|           0|
-----------------------------------------------
     2017|          0|       3500|       250|
-----------------------------------------------

Define a simple CSV file which keeps the data:
year,teamId,sum
2015,Test,480
2015,Develop,800
2015,Deploy,300
.....
To create our pivot, first we need to read the data in an appropriate structure Map<YearTeam, List<Person>> where YearTeam is also a simple POJO containg year and teamId. For this, Collectors.groupingBy method will really help.
Pattern pattern = Pattern.compile(",");
Map<YearTeam, List<Person>> grouped = new HashMap<>();
try (BufferedReader in = new BufferedReader(new FileReader(fileName));) {
   grouped = in
    .lines()
    .skip(1)
    .map(line -> {
       String[] arr = pattern.split(line);
       return new Person(
                  Integer.parseInt(arr[0]), 
                  arr[1],
                  Integer.parseInt(arr[2]));
       })
    .collect(Collectors.groupingBy(x -> new YearTeam(x.getYear(), x.getTeamId())));
}
Having this map, we want to print first the teams header and then every year with total sum. First step is to obtain the set of teams:
Set<String> teams = map
        .keySet()
        .stream()
        .map(x -> x.getTeamId())
        .collect(Collectors.toCollection(TreeSet::new));

We can define some useful parameters for pretty printing:
public static int COLUMN_WIDTH = 9;
public static String COLUMN_DELIMITER = "|";
public static String LINE_DELIMITER = "-";
And some print utilities methods:
private static void printLineDelimiter(int columns) {
    System.out.println();
    System.out.println(String.join("", Collections.nCopies(
               columns*(COLUMN_WIDTH +1), LINE_DELIMITER)));
}

private static void printTeamsHeader(Set<String> teams) {
    System.out.printf("%" + (COLUMN_WIDTH + 1) + "s", COLUMN_DELIMITER);
    teams.stream().forEach(t -> System.out.printf("%" + (COLUMN_WIDTH + 1) + "s", 
                                t + COLUMN_DELIMITER));
}

private static void printYear(int year) {
    System.out.printf("%" + (COLUMN_WIDTH +1) + "s" , year + COLUMN_DELIMITER);
}

private static void printTotal(long total) {
    System.out.printf("%" + COLUMN_WIDTH + "s", total);
}
With these we can start to print teams header:
int columns = teams.size()+1;

printLineDelimiter(columns);
printTeamsHeader(teams);
printLineDelimiter(columns);
Next we need to print actual data. To compute the total sum we will use Collectors.summingLong :
Set<Integer> years = map
        .keySet()
        .stream()
        .map(x -> x.getYear())
        .collect(Collectors.toSet());

years
    .stream()
    .forEach(y -> {
        printYear(y);
        teams.stream().forEach(t -> {
            YearTeam yt = new YearTeam(y, t);
            List<Person> persons = map.get(yt);
            long total = persons == null ? 0 :
                    persons.stream()
                           .collect(Collectors.summingLong(Person::getSum));
            printTotal(total);
            System.out.print(COLUMN_DELIMITER);
        });
        printLineDelimiter(columns);
    });