First, the code makes an HTTP request with the GraphQL query to get the MQTT subscription URL, the client ID and the topic to subscribe. The project uses Paho 1.1.0, then with those data, It tries to connect to the MQTT broker. When the code runs I'm only able to get the MQTT URL but when It tries to connect to the AWS AppSync Broker It throws a NullPointerException:
Exception in thread "main" MqttException (0) - java.lang.NullPointerException, compiling:(/tmp/form-init1329614074410829203.clj:1:73)
at clojure.lang.Compiler.load(Compiler.java:7391)
at clojure.lang.Compiler.loadFile(Compiler.java:7317)
at clojure.main$load_script.invokeStatic(main.clj:275)
at clojure.main$init_opt.invokeStatic(main.clj:277)
at clojure.main$init_opt.invoke(main.clj:277)
at clojure.main$initialize.invokeStatic(main.clj:308)
at clojure.main$null_opt.invokeStatic(main.clj:342)
at clojure.main$null_opt.invoke(main.clj:339)
at clojure.main$main.invokeStatic(main.clj:421)
at clojure.main$main.doInvoke(main.clj:384)
at clojure.lang.RestFn.invoke(RestFn.java:421)
at clojure.lang.Var.invoke(Var.java:383)
at clojure.lang.AFn.applyToHelper(AFn.java:156)
at clojure.lang.Var.applyTo(Var.java:700)
at clojure.main.main(main.java:37)
Caused by: MqttException (0) - java.lang.NullPointerException
at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:38)
at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:664)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketHandshake.receiveHandshakeResponse(WebSocketHandshake.java:133)
at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketHandshake.execute(WebSocketHandshake.java:74)
at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketSecureNetworkModule.start(WebSocketSecureNetworkModule.java:77)
at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:650)
... 1 more
Debugging I realized that the broker actually returns a 403 Forbidden
The code:
project.clj
(defproject gateway-clj "0.1.0-SNAPSHOT"
:description "test"
:url "http://example.com/FIXME"
:main gateway-clj.core
:debug true
:uberjar {:aot :all}
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.8.0"]
[clojurewerkz/urly "1.0.0"]
[clj-http "3.10.0"]
[cheshire "5.8.1"]
[org.eclipse.paho/org.eclipse.paho.client.mqttv3 "1.1.0"]])
core.clj
(ns gateway-clj.core
(:refer-clojure :exclude [resolve])
(:use clojurewerkz.urly.core)
(require
[clj-http.client :as client]
[cheshire.core :refer :all])
(import [org.eclipse.paho.client.mqttv3 MqttCallback MqttAsyncClient MqttConnectOptions MqttTopic MqttException MqttDeliveryToken MqttClientPersistence]
[org.eclipse.paho.client.mqttv3.persist MemoryPersistence]
[java.util Properties]
[java.net URI URL]
[javax.net SocketFactory]))
(def data {:api-url "https://xxxxx.appsync-api.us-east-2.amazonaws.com/graphql"
:request-conf {:headers {"X-Api-Key" "xxxxxx"}
:content-type :json
:body "{\"query\":\"mutation createRoom ($input: CreateRoomInput!) {\\n createRoom(input:$input){\\n id\\n }\\n}\\n\\nquery listMessages{\\n listMessages {\\n items{\\n id\\n content\\n }\\n }\\n}\\n\\nquery listRooms {\\n listRooms {\\n items {\\n id\\n createdAt\\n }\\n }\\n}\\n\\nsubscription OnCreateMessage {\\n onCreateMessage {\\n __typename\\n id\\n when\\n content\\n roomId\\n }\\n }\\n \",\"variables\":{\"input\":{\"id\":\"room2\"}},\"operationName\":\"OnCreateMessage\"}"}})
(defn ^MqttClientPersistence new-memory-persister
"Returns new in-memory persister"
[]
(MemoryPersistence.))
(defn- mqtt-callback
"Function called after delivery confirmation"
[]
(reify MqttCallback
(connectionLost [_ cause]
(println (.toString cause)))
(messageArrived [_ topic message]
(println "Topic: " (.getName topic))
(println "Message: " (.getPayload message)))
(deliveryComplete [_ token]
(println "*** DELIVERY COMPLETE ***"))))
(defn- mqtt-connect
[topic broker-url client-id]
(let [mqtt-conn-options (MqttConnectOptions.)
url-map (as-map (url-like broker-url))
mqtt-client (MqttAsyncClient. broker-url client-id (new-memory-persister))]
(println client-id)
(println "----")
(println broker-url)
(doto mqtt-conn-options
(.setCleanSession false)
(.setKeepAliveInterval 30)
(.setMqttVersion 0))
(.setCallback mqtt-client (mqtt-callback))
(let [mqtt-token (.connect mqtt-client mqtt-conn-options)]
(println mqtt-token)
(. mqtt-token waitForCompletion)
mqtt-client)))
(defn -main []
(let [res (client/post (get data :api-url) (get data :request-conf))
body (parse-string (get res :body) true)
mqtt-data (first (get-in body [:extensions :subscription :mqttConnections]))
{mqtt-url :url
mqtt-client-id :client
mqtt-topic :topic} mqtt-data
mqtt-client (mqtt-connect mqtt-topic mqtt-url mqtt-client-id)]
(. mqtt-client subscribe mqtt-topic 0)))
mqttConnections
in GraphQLsubscription
s, how canmqtt-data
be anything but nil? – Dynemqtt-data
is a map that i get from the previous http POST request(client/post (get data :api-url) (get data :request-conf))
and that is working well, i'm able to get the mqtt connection data (broker url, client id, topic) but when i try to subscribe to the mqtt broker with Paho(.connect mqtt-client mqtt-conn-options)
is the problem. – Honk(get-in data [:request-conf :body]
) only includes one mutation(createRoom
), two queries (listMessages
andlistRoom
) and one subscription (onCreateMessage
), while in the next line you are using(get-in body [:extensions :subscription :mqttConnections])
to getmqtt-data
- while the subscriptionmqttConnections
does not appear anywhere in the query. I would insert_ (println mqtt-data)
before creating the connection just to make sure that you have the right URL. – Dynemqtt-data
it shows:{:url wss://xxxx-ats.iot.us-east-2.amazonaws.com/mqtt?<AWSQueryStrings>, :topics [140021282151/kdlmr2cgkrdzdaf5ykga2rsdzi/onCreateMessage/], :client 5xqmiqcperdfehcdtyzpjvrihfu}
– HonkWebSocket Response header: Incorrect upgrade.
and the answers for that issue is to return to the1.1.0
version. – Honk