Why is AWS SQS not a default connector for Apache Flink? Is there some technical limitation to doing this? Or was it just something that didn't get done? I want to implement this, any pointers would be appreciated
Probably too late for an answer to the original question... I wrote a SQS consumer as a SourceFunction, using the Java Messaging Service library for SQS:
SQSConsumer extends RichParallelSourceFunction<String> {
private volatile boolean isRunning;
private transient AmazonSQS sqs;
private transient SQSConnectionFactory connectionFactory;
private transient ExecutorService consumerExecutor;
@Override
public void open(Configuration parameters) throws Exception {
String region = ...
AWSCredentialsProvider credsProvider = ...
// may be use a blocking array backed thread pool to handle surges?
consumerExecutor = Executors.newCachedThreadPool();
ClientConfiguration clientConfig = PredefinedClientConfigurations.defaultConfig();
this.sqs = AmazonSQSAsyncClientBuilder.standard().withRegion(region).withCredentials(credsProvider)
.withClientConfiguration(clientConfig)
.withExecutorFactory(()->consumerExecutor).build();
this.connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), sqs);
this.isRunning = true;
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
SQSConnection connection = connectionFactory.createConnection();
// ack each msg explicitly
Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);
Queue queue = session.createQueue(<queueName>);
MessageConsumer msgConsumer = session.createConsumer(queue);
msgConsumer.setMessageListener(msg -> {
try {
String msgId = msg.getJMSMessageID();
String evt = ((TextMessage) msg).getText();
ctx.collect(evt);
msg.acknowledge();
} catch (JSMException e) {
// log and move on the next msg or bail with an exception
// have a dead letter queue is configured so this message is not lost
// msg is not acknowledged so it may be picked up again by another consumer instance
}
};
// check if we were canceled
if (!isRunning) {
return;
}
connection.start();
while (!consumerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
// keep waiting
}
}
@Override
public void cancel() {
isRunning = false;
// this method might be called before the task actually starts running
if (sqs != null) {
sqs.shutdown();
}
if(consumerExecutor != null) {
consumerExecutor.shutdown();
try {
consumerExecutor.awaitTermination(1, TimeUnit.MINUTES);
} catch (Exception e) {
//log e
}
}
}
@Override
public void close() throws Exception {
cancel();
super.close();
}
}
Note if you are using a standard SQS queue you may have to de-dup the messages depending on whether exactly-once guarantees are required.
Reference: Working with JMS and Amazon SQS
credsProvider
you use? I am getting java.io.NotSerializableException: com.amazonaws.auth.profile.ProfileCredentialsProvider
for ProfileCredentialsProvider
. –
Troublemaker At the moment, there is no connector for AWS SQS in Apache Flink. Have a look at the already existing connectors. I assume you already know about this, and would like to give some pointers. I was also looking for an SQS connector recently and found this mail thread.
Apache Kinesis Connector is somewhat similar to what you can implement on this. See whether you can get a start on this using this connector.
© 2022 - 2024 — McMap. All rights reserved.