RabbitMQ Java Sample Code

24 Sep 2018

Table of Contents


Consumer

Java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;
import java.io.IOException;

public class RabbitConsumer {

    private final static String HOST = "localhost";
    private final static String VIRTUAL_HOST = "/governance/catalog";
    private final static String USER_NAME = "abc";
    private final static String PASSWORD = "abc123";
    private final static String CONNECTION_NAME = "abcClient01";
    private final static int PORT = 5672;
    private final static int TIMEOUT = 10000; // millis

    private final static String QUEUE_NAME = "create";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        factory.setPort(PORT);
        factory.setVirtualHost(VIRTUAL_HOST);
        factory.setHandshakeTimeout(TIMEOUT);
        factory.setConnectionTimeout(TIMEOUT);

        Connection connection = factory.newConnection(CONNECTION_NAME);
        Channel channel = connection.createChannel();

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                // Called when a basic.deliver is received for this consumer.
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, callback);
    }
}

Producer

Java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.AMQP;
import java.util.concurrent.TimeoutException;
import java.util.Date;
import java.util.logging.Level;
import java.util.logging.Logger;

public class RabbitProducer {
   private final static String HOST = "localhost";
   private final static String VIRTUAL_HOST = "/governance/catalog";
   private final static String EXCHANGE_NAME = "core";
   private final static String USER_NAME = "abc";
   private final static String PASSWORD = "abc123";
   private final static String CONNECTION_NAME = "abcContent";
   private final static int PORT = 5672;
   private final static int TIMEOUT = 10000; // millis

   public static void main(String[] args) throws java.io.IOException {

       try {
           ConnectionFactory factory = new ConnectionFactory();
           factory.setHost(HOST);
           factory.setUsername(USER_NAME);
           factory.setPassword(PASSWORD);
           factory.setPort(PORT);
           factory.setVirtualHost(VIRTUAL_HOST);
           factory.setHandshakeTimeout(TIMEOUT);
           factory.setConnectionTimeout(TIMEOUT);

           AMQP.BasicProperties properties = getAmqpProperties();

           Connection connection = factory.newConnection(CONNECTION_NAME);
           Channel channel = connection.createChannel();
           try {
               // Where the ABC message logic happens.
               System.out.println("Connected!");
               String message = "Hello 2";
               String routingKey = "xyz.create.hello";
               channel.basicPublish(EXCHANGE_NAME, routingKey, properties, message.getBytes());
           } catch (Exception ex) {
               // to do
           } finally {
               channel.close();
               connection.close();
               System.out.println("Disconnected!");
           }
       } catch (TimeoutException ex) {
           Logger.getLogger(RabbitProducer.class.getName()).log(Level.SEVERE, null, ex);
       }
   }

   private static AMQP.BasicProperties getAmqpProperties() {
       // https://stackoverflow.com/questions/18403623/rabbitmq-amqp-basicproperties-builder-values
       return new AMQP.BasicProperties(
               "application/json",  // contentType
               "UTF-8",             // contentEncoding
               null,                // headers
               null,                // deliveryMode
               null,                // priority
               null,                // correlationID
               null,                // replyTo
               null,                // expiration
               null,                // messageID
               new Date(),          // timestamp
               null,                // type
               null,                // userID
               "ContentRegistry",   // appID
               null                 // clusterID
       );
   }
}