Connect AWS SQS to Apache-Flink
Asked Answered
L

2

6

Why is AWS SQS not a default connector for Apache Flink? Is there some technical limitation to doing this? Or was it just something that didn't get done? I want to implement this, any pointers would be appreciated

Leake answered 8/7, 2018 at 11:44 Comment(2)
Did you ever end up writing one?Biyearly
Any word on this? I'd be interested in this too.Zacynthus
O
2

Probably too late for an answer to the original question... I wrote a SQS consumer as a SourceFunction, using the Java Messaging Service library for SQS:

SQSConsumer extends RichParallelSourceFunction<String> {
   private volatile boolean isRunning;
   private transient AmazonSQS sqs;
   private transient SQSConnectionFactory connectionFactory;
   private transient ExecutorService consumerExecutor;

   @Override
   public void open(Configuration parameters) throws Exception {
      String region = ...
      AWSCredentialsProvider credsProvider = ...
      // may be use a blocking array backed thread pool to handle surges?
      consumerExecutor = Executors.newCachedThreadPool();
      ClientConfiguration clientConfig = PredefinedClientConfigurations.defaultConfig();
      this.sqs = AmazonSQSAsyncClientBuilder.standard().withRegion(region).withCredentials(credsProvider)
            .withClientConfiguration(clientConfig)
            .withExecutorFactory(()->consumerExecutor).build();
      this.connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), sqs);
      this.isRunning = true;
   }

   @Override
   public void run(SourceContext<String> ctx) throws Exception {
      SQSConnection connection = connectionFactory.createConnection();
      // ack each msg explicitly
      Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);
      Queue queue = session.createQueue(<queueName>);
      MessageConsumer msgConsumer = session.createConsumer(queue);
      msgConsumer.setMessageListener(msg -> {
          try {
              String msgId = msg.getJMSMessageID();
              String evt = ((TextMessage) msg).getText();
              ctx.collect(evt);
              msg.acknowledge();
          } catch (JSMException e) {
              // log and move on the next msg or bail with an exception
              // have a dead letter queue is configured so this message is not lost
              // msg is not acknowledged so it may be picked up again by another consumer instance
          }
      };
      // check if we were canceled
      if (!isRunning) {
          return;
      }
      connection.start();
      while (!consumerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
          // keep waiting
      }
  }
            

  @Override
  public void cancel() {
      isRunning = false;
      // this method might be called before the task actually starts running
      if (sqs != null) {
          sqs.shutdown();
      }
      if(consumerExecutor != null) {
           consumerExecutor.shutdown();
           try {
               consumerExecutor.awaitTermination(1, TimeUnit.MINUTES); 
           } catch (Exception e) {
               //log e
           }
      }
   }

   @Override
   public void close() throws Exception {
       cancel();
       super.close();
   }
}

Note if you are using a standard SQS queue you may have to de-dup the messages depending on whether exactly-once guarantees are required.

Reference: Working with JMS and Amazon SQS

Ophthalmologist answered 27/10, 2021 at 15:48 Comment(1)
Thank you for the example, quite useful. May I ask what credsProvider you use? I am getting java.io.NotSerializableException: com.amazonaws.auth.profile.ProfileCredentialsProvider for ProfileCredentialsProvider.Troublemaker
K
0

At the moment, there is no connector for AWS SQS in Apache Flink. Have a look at the already existing connectors. I assume you already know about this, and would like to give some pointers. I was also looking for an SQS connector recently and found this mail thread.

Apache Kinesis Connector is somewhat similar to what you can implement on this. See whether you can get a start on this using this connector.

Kaz answered 6/5, 2019 at 8:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.