AWS Lambda - How to get the topic name of data coming from AWS IOT
Asked Answered
C

2

12

I'm testing AWS Lambda with an AWS IOT source. My mqtt clients are publishing in different topics : device A publish data into streaming/A, device B publish data into streaming/B so in AWS Lambda I defined a SQL rule selecting all devices coming from the topics streaming/+. The thing is that now I didn't have the information of the device source because I only have a Array[Byte]] with extra informations. If anyone has a solution to access to the mqtt payload with the topic information, I will take it !

import java.io.{ByteArrayOutputStream, InputStream, OutputStream}
import com.amazonaws.services.lambda.runtime.{Context, RequestStreamHandler}
/**
  * Created by alifirat on 24/04/17.
  */
class IOTConsumer extends RequestStreamHandler {

  val BUFFER_SIZE = 1024 * 4

  override def handleRequest(input: InputStream, output: OutputStream, context: Context): Unit = {
    val bytes = toByteArray(input)
    val logger= context.getLogger
    logger.log("Receive following thing :"  + new String(bytes))
    output.write(bytes)
  }

   /**
     * Reads and returns the rest of the given input stream as a byte array.
     * Caller is responsible for closing the given input stream.
     */
   def toByteArray(is : InputStream) : Array[Byte] = {
     val output = new ByteArrayOutputStream()
     try {
       val b = new Array[Byte](BUFFER_SIZE);
       var n = 0
       var flag = true
       while(flag) {
         n = is.read(b)
         if(n == -1) flag = false
         else {
           output.write(b, 0, n)
         }
       }
       output.toByteArray();
     } finally {
       output.close();
       Array[Byte]()
     }
   }
}
Collywobbles answered 24/4, 2017 at 14:33 Comment(5)
What instructions are you following? github.com/aws/aws-iot-device-sdk-java seems relevantExcellence
I follow this one : docs.aws.amazon.com/iot/latest/developerguide/… but in the documentation, the event type is not specified (normal it's node.js) but I'm using Scala/Java so I need a type. This type must allow to access to the data source mqtt topic.Collywobbles
Ah, ok. But why do you think you need to process a java.io.InputStream? Where did that code come from? That doc seems to suggest that mqtt message is JSON.Excellence
Because I send byte array in AWS IOT so I'm expecting to retrieve this byte array as input of AWS Lambda.Collywobbles
Oh, ok. I didn't realize you could do that. I don't know anything about IoT or mqtt. Sounds like you are using Lambda and SNS on a low-level to pass bytes. I guess that's why you can't access any meta-information. It's only available if you use JSON or POJO.Excellence
E
33

I was looking for the same thing, there is a way to achieve that. While constructing your SQL you can use the topic() function to get the topic the message was sent to. That way you could put in the attribute section

*, topic() as topic

so your final SQL will look like:

SELECT *, topic() as topic FROM one/of/my/+/topics

your payload will then contain a new attribute topic that you can parse within your lambda function. More on this https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-functions.html

El answered 14/7, 2017 at 11:56 Comment(2)
Thanks for the solution, it really worked, if you could post a link to the documentation you found this on, it's highly appreciated.Corporeal
@Corporeal added a link to the documentation to my answerEl
E
0

If your trigger is SNS messages, then I would just read the JSON. This will work in Scala:

import com.amazonaws.services.lambda.runtime.events.SNSEvent

import scala.collection.JavaConverters._

object Example extends LambdaApp  {

  /** Convert Java lists (or nulls!) to Scala lists */
  def safeList[A](xs: java.util.List[A]) =
    Option(xs).map(_.asScala).getOrElse(List.empty[A])

  /** Install the handler in AWS Lambda as `Example::handler`. */
  def handler(e: SNSEvent)  = {

    val rs = for {
      r <- safeList(e.getRecords)
    } yield {
      r.getSNS.getMessage
    }
    rs.asJava // Convert Scala list to Java.
  }
}

You'll need to have the following dependencies in your build.sbt:

libraryDependencies ++= Seq(
  "com.amazonaws" % "aws-lambda-java-core" % "1.1.0",
  "com.amazonaws" % "aws-lambda-java-events" % "1.3.0"
)

If you're interested in the SNS topic name, then you could get it from:

r.EventSubscriptionArn

The AWS Lambda JDK library parses the JSON of the SNS message for you with jackson-core.

Excellence answered 24/4, 2017 at 15:0 Comment(1)
My trigger is an AWS IOT rule : I defined a rule forwarding message coming from AWS IOT and applying an AWS Lambda so it's not a SNS messages.Collywobbles

© 2022 - 2024 — McMap. All rights reserved.