Spring boot - how to connect to external ActiveMQ master/slave cluster with failover URL
Asked Answered
B

4

8

We have 2 ActiveMQ nodes on different VMs (e.g. hosts: amq1, amq2). They are linked as master/slave cluster.

We would like to connect to this cluster with failover protocol. How can this be done? Spring boot config:

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {

  @Override
  public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic")
        .setRelayHost(activeMQProperties.getRelayHost())
        .setRelayPort(activeMQProperties.getRelayPort());
  }
}

Unfortunately here we have only the possibility to set one host and one port. How could we set something like this:

failover:(stomp://amq1:61613,stomp://amq2:61613)

UPDATE: currently Spring Boot 2.3.5 is used

Boykin answered 26/1, 2018 at 12:45 Comment(4)
Looks like it is not supported at the moment: jira.spring.io/plugins/servlet/mobile#issue/SPR-12452Boykin
1st. Which version of the Spring-boot? 2nd failover url is an OpenWire url not Stomp protocol; 3rd it should be supported now just by using application properties (or yml) without @Configuration Bean.Chambertin
At the moment we use Spring Boot 2.3.5. As I see in the GitHub threads afailover connection to master/slave config when a slave should take over all the connection when a master is down is still not solved: github.com/spring-projects/spring-framework/issues/17057 and github.com/spring-projects/spring-framework/issues/26169 . If I miss something, please add a link to a sample or a documentation.Boykin
what confused me is the usage of tcp:/ which is normally used by the openwire protocol. for stomp you have to use something like this: failover:(stomp://amq1:61613,stomp://amq2:61613). After perhaps it depends how it is configured on the AMQ side but in our case as we enabled both protocols, we use tcp: for openwire and stomp: for stomp.Chambertin
B
1

I've tried the options mentioned by you with "failover" in the connection string but it did not work, and found some threads that it is not even supported for stomp.

So the final solution looks like an own implementation: two ActiveMQ servers with master-slave configurations.

Spring config (important part):

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
    private final Logger LOGGER = LoggerFactory.getLogger(WebsocketConfiguration.class);
    
    private final ActiveMQProperties activeMQProperties;
    
    // used by own round-robin implementation to connect to the current master ActiveMQ
    private int index = 0;
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic")
            .setSystemHeartbeatReceiveInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
            .setSystemHeartbeatSendInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
            .setTcpClient(createTcpClient());
    }
    
    private ReactorNettyTcpClient<byte[]> createTcpClient() {
        return new ReactorNettyTcpClient<>(
            client -> client.remoteAddress(socketAddressSupplier()),
            new StompReactorNettyCodec());
    }

    private Supplier<? extends SocketAddress> socketAddressSupplier() {

        Supplier<? extends SocketAddress> socketAddressSupplier = () -> {
            index++;
            if (index >= activeMQProperties.getActiveMQServerList().size()) {
                index = 0;
            }
            return new InetSocketAddress(activeMQProperties.getActiveMQServerList().get(index).getRelayHost(),
                activeMQProperties.getActiveMQServerList().get(index).getRelayPort());
        };

        return socketAddressSupplier;
    }
}

The ActiveMQProperties:

activemq:                              
    activeMQServerList:
      -
        relayHost: host1
        relayPort: 61613
      -
        relayHost: host2
        relayPort: 61613

The trick is the supplier. When the master ActiveMQ goes down, the supplier will return the next configured server from the list and reconnects to that.

It works properly.

Boykin answered 22/1, 2021 at 7:15 Comment(0)
N
0

in Application properties:

spring.activemq.broker-url=failover://tcp://your_host_IP:61616
Nodule answered 30/7, 2018 at 23:19 Comment(0)
P
0

I was not aware of the latest features in Stomp protocol but in general in ActiveMQ we define the openwire protocol with prefix tcp:// (or ssl:// for SSL secured transports). For Stomp they use stomp:// prefix and here is a sample of what can be configured on the ActiveMQ server side:

    <transportConnectors>
        <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
        <transportConnector name="stomp" uri="stomp://localhost:61613"/>
        <transportConnector name="websocket" uri="ws://0.0.0.0:61614"/>
    </transportConnectors>

Then assuming the configuration is similar to both ActiveMQ nodes and if you need to refer using failover protocol through openwire you will use in you spring-boot config (yml format):

spring:
  activemq:
    broker-url=failover:(tcp://amq1:61616,tcp://amq2:61616)

but for using with stomp it will be:

spring:
  activemq:
    broker-url=failover:(stomp://amq1:61613,stomp://amq2:61613)

and for websocket stomp:

spring:
  activemq:
    broker-url=failover:(ws://amq1:61614,ws://amq2:61614)

Notes:

  • the port changes to match with what is configured on ActiveMQ.
  • it seems you don't really need a @Configuation bean: using the properties above would be enough to configure an ActiveMQ with failover.
  • As there is a mention of websocket in your code, I also mention the three possibilities your AMQ potentially uses.

But for a better answer it would be better if you post your ActiveMQ transports section. If you do not have access to it, I would suggest you contact your ActiveMQ administrator to ask the proper url's and what are the way to access the ActiveMQ (there are also other protocols that can be enabled like MQTT and AMPQ to mention the most used).

Pomp answered 14/12, 2020 at 22:26 Comment(2)
Thanks for the detailed answer. I've tried it and the "broker-url=failover:(stomp://amq1:61613,stomp://amq2:61613)" does not work. Maybe it works for openwire, but not for stomp. Finally I managed to solve it with own implementation, see the details below.Boykin
That was my feeling, some clients we have are using stomp and they also have developed their own failover strategy (try node01, if error then 02). I was not aware of any change on this protocol but for sure "tcp" is not the standard for stomp but it depends what is configured / supported on ActiveMQ side.Chambertin
R
0

Add setTcpClient into your configureMessageBroker method to obtain a round-robin implementation, like this below: index will switch between the hosts every time the current host is not available (setTcpClient will be called every time this happens)

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private final CompositeData compositeData;
    private final int stompPort;
    private final AtomicInteger currentServerIndex = new AtomicInteger(-1);

    public WebSocketConfig(@Value("${jms.broker.url}") String brokerUrl,
                           @Value("${jms.stomp.port}") int stompPort) {
        this.compositeData = parseBrokerUri(brokerUrl);
        this.stompPort = stompPort;
    }


    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket")
                .setAllowedOrigins("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic")
                .setRelayHost(activeMQProperties.getRelayHost())
                .setRelayPort(activeMQProperties.getRelayPort())
                .setTcpClient(new ReactorNettyTcpClient<>(
                        builder -> {
                            final TcpClient tcpClient = builder.remoteAddress(() -> new InetSocketAddress(getBrokerHost(), stompPort));
                            return isSecure() ? tcpClient.secure() : tcpClient.noSSL();
                        }, new StompReactorNettyCodec())
                );
    }

    private boolean isSecure() {
        return getBrokerUri().getScheme().equals("ssl");
    }

    private String getBrokerHost() {
        return getBrokerUri().getHost();
    }

    private URI getBrokerUri() {
        currentServerIndex.set((currentServerIndex.incrementAndGet()) % compositeData.getComponents().length);
        return compositeData.getComponents()[currentServerIndex.get()];
    }

    private CompositeData parseBrokerUri(String brokerUri) {
        try {
            return URISupport.parseComposite(new URI(brokerUri));
        } catch (URISyntaxException e) {
            throw new RuntimeException("Error parsing broker uri", e);
        }
    }
}

Here the imports just for completeness:

import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import reactor.netty.tcp.TcpClient;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
Ricercare answered 5/8, 2021 at 13:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.