Connect to AWS AppSync MQTT broker using Paho
Asked Answered
H

0

6

First, the code makes an HTTP request with the GraphQL query to get the MQTT subscription URL, the client ID and the topic to subscribe. The project uses Paho 1.1.0, then with those data, It tries to connect to the MQTT broker. When the code runs I'm only able to get the MQTT URL but when It tries to connect to the AWS AppSync Broker It throws a NullPointerException:

Exception in thread "main" MqttException (0) - java.lang.NullPointerException, compiling:(/tmp/form-init1329614074410829203.clj:1:73)
        at clojure.lang.Compiler.load(Compiler.java:7391)
        at clojure.lang.Compiler.loadFile(Compiler.java:7317)
        at clojure.main$load_script.invokeStatic(main.clj:275)
        at clojure.main$init_opt.invokeStatic(main.clj:277)
        at clojure.main$init_opt.invoke(main.clj:277)
        at clojure.main$initialize.invokeStatic(main.clj:308)
        at clojure.main$null_opt.invokeStatic(main.clj:342)
        at clojure.main$null_opt.invoke(main.clj:339)
        at clojure.main$main.invokeStatic(main.clj:421)
        at clojure.main$main.doInvoke(main.clj:384)
        at clojure.lang.RestFn.invoke(RestFn.java:421)
        at clojure.lang.Var.invoke(Var.java:383)
        at clojure.lang.AFn.applyToHelper(AFn.java:156)
        at clojure.lang.Var.applyTo(Var.java:700)
        at clojure.main.main(main.java:37)
Caused by: MqttException (0) - java.lang.NullPointerException
        at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:38)
        at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:664)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketHandshake.receiveHandshakeResponse(WebSocketHandshake.java:133)
        at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketHandshake.execute(WebSocketHandshake.java:74)
        at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketSecureNetworkModule.start(WebSocketSecureNetworkModule.java:77)
        at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:650)
        ... 1 more

Debugging I realized that the broker actually returns a 403 Forbidden

The code:

project.clj

(defproject gateway-clj "0.1.0-SNAPSHOT"
  :description "test"
  :url "http://example.com/FIXME"
  :main gateway-clj.core
  :debug true
  :uberjar  {:aot :all}
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure "1.8.0"]
                 [clojurewerkz/urly "1.0.0"]
                 [clj-http "3.10.0"]
                 [cheshire "5.8.1"]
                 [org.eclipse.paho/org.eclipse.paho.client.mqttv3 "1.1.0"]])

core.clj

   (ns gateway-clj.core
  (:refer-clojure :exclude [resolve])
  (:use clojurewerkz.urly.core)
  (require
   [clj-http.client :as client]
   [cheshire.core :refer :all])
  (import  [org.eclipse.paho.client.mqttv3 MqttCallback MqttAsyncClient MqttConnectOptions MqttTopic MqttException MqttDeliveryToken MqttClientPersistence]
           [org.eclipse.paho.client.mqttv3.persist MemoryPersistence]
           [java.util Properties]
           [java.net URI URL]
           [javax.net SocketFactory]))

(def data {:api-url "https://xxxxx.appsync-api.us-east-2.amazonaws.com/graphql"
           :request-conf {:headers {"X-Api-Key" "xxxxxx"}
                          :content-type :json
                          :body "{\"query\":\"mutation createRoom ($input: CreateRoomInput!) {\\n  createRoom(input:$input){\\n    id\\n  }\\n}\\n\\nquery listMessages{\\n  listMessages {\\n    items{\\n      id\\n      content\\n    }\\n  }\\n}\\n\\nquery listRooms {\\n  listRooms {\\n    items {\\n      id\\n      createdAt\\n    }\\n  }\\n}\\n\\nsubscription OnCreateMessage {\\n    onCreateMessage {\\n      __typename\\n      id\\n      when\\n      content\\n      roomId\\n    }\\n  }\\n \",\"variables\":{\"input\":{\"id\":\"room2\"}},\"operationName\":\"OnCreateMessage\"}"}})


(defn ^MqttClientPersistence new-memory-persister
  "Returns new in-memory persister"
  []
  (MemoryPersistence.))

(defn- mqtt-callback
  "Function called after delivery confirmation"
  []
  (reify MqttCallback
    (connectionLost [_ cause]
      (println (.toString cause)))
    (messageArrived [_ topic message]
      (println "Topic: " (.getName topic))
      (println "Message: " (.getPayload message)))
    (deliveryComplete [_ token]
      (println "*** DELIVERY COMPLETE ***"))))

(defn- mqtt-connect
  [topic broker-url client-id]
  (let [mqtt-conn-options (MqttConnectOptions.)
        url-map   (as-map (url-like broker-url))
        mqtt-client (MqttAsyncClient. broker-url client-id (new-memory-persister))]
    (println client-id)
    (println "----")
    (println broker-url)
    (doto mqtt-conn-options
      (.setCleanSession false)
      (.setKeepAliveInterval 30)
      (.setMqttVersion 0))

    (.setCallback mqtt-client (mqtt-callback))
    (let [mqtt-token (.connect mqtt-client mqtt-conn-options)]
      (println mqtt-token)
      (. mqtt-token waitForCompletion)
      mqtt-client)))

(defn -main [] 
  (let [res (client/post (get data :api-url) (get data :request-conf))
        body (parse-string (get res :body) true)
        mqtt-data (first (get-in body [:extensions :subscription :mqttConnections]))

        {mqtt-url :url
         mqtt-client-id :client
         mqtt-topic  :topic} mqtt-data
        mqtt-client (mqtt-connect mqtt-topic mqtt-url  mqtt-client-id)]
     (. mqtt-client subscribe mqtt-topic 0)))
Honk answered 31/5, 2019 at 20:33 Comment(6)
Possibly unrelated - there seems to be no mqttConnections in GraphQL subscriptions, how can mqtt-data be anything but nil?Dyne
I'm not pretty sure if I understood your comment but mqtt-data is a map that i get from the previous http POST request (client/post (get data :api-url) (get data :request-conf)) and that is working well, i'm able to get the mqtt connection data (broker url, client id, topic) but when i try to subscribe to the mqtt broker with Paho (.connect mqtt-client mqtt-conn-options) is the problem.Honk
The GraphQL query which is in the code (i.e. (get-in data [:request-conf :body]) only includes one mutation(createRoom), two queries (listMessages and listRoom) and one subscription (onCreateMessage), while in the next line you are using (get-in body [:extensions :subscription :mqttConnections]) to get mqtt-data - while the subscription mqttConnections does not appear anywhere in the query. I would insert _ (println mqtt-data) before creating the connection just to make sure that you have the right URL.Dyne
printing mqtt-data it shows: {:url wss://xxxx-ats.iot.us-east-2.amazonaws.com/mqtt?<AWSQueryStrings>, :topics [140021282151/kdlmr2cgkrdzdaf5ykga2rsdzi/onCreateMessage/], :client 5xqmiqcperdfehcdtyzpjvrihfu}Honk
Then I may suppose that the NPE is due to a bug in the library (see github.com/aws/aws-iot-device-sdk-java/issues/5, github.com/eclipse/paho.mqtt.java/issues/234) - have you considered using a newer version of mqttv3 client?Dyne
I did it actually, but when I use newer versions I get a WebSocket Response header: Incorrect upgrade. and the answers for that issue is to return to the 1.1.0 version.Honk

© 2022 - 2024 — McMap. All rights reserved.