Using apoc.periodic.commit to insert an endless json stream into neo4j
Asked Answered
T

2

1

I'm new to NEO4J, and am attempting to insert data from a JSON stream into the database. The root element of the JSON stream is an array, each element in the array is an object that contains a key/value, and an array.

sample of JSON stream:

[
{
 "access_point":4864834, 
 "objects": [ 
  {"class_id":10, "name":"iphone", "snr":0.557461}, 
  {"class_id":7, "name":"android", "snr":0.822390}, 
  {"class_id":7, "name":"android", "snr":0.320850}, 
  {"class_id":2, "name":"pc", "snr":0.915604}
 ] 
}, 
{
 "access_point":4864835, 
 "objects": [ 
  {"class_id":12, "name":"iphone", "snr":0.268736}, 
  {"class_id":10, "name":"android", "snr":0.585927}, 
  {"class_id":7, "name":"android", "snr":0.821383}, 
  {"class_id":2, "name":"pc", "snr":0.254997}, 
  {"class_id":7, "name":"android", "snr":0.326559}, 
  {"class_id":2, "name":"pc", "snr":0.905473}
 ] 
}, 

Because it is an endless stream, I need to do batch commits as the apoc.load.json will never reach the end of the array.

so far my query looks like:

CALL apoc.periodic.commit("
CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value as accesspoint MERGE(f:Accesspoint {id: accesspoint.access_point, name: accesspoint.access_point})
FOREACH(object IN accesspoint.objects | MERGE (f)-[r:OBSERVED]->(:Object {class_id:object.class_id, name:object.name, access_point_id:accesspoint.access_point}))",
{limit:10, batchSize: 10});

This of course is not referencing a JSON stream, but static JSON in my github.

Is there a way to tell it to persist after n elements in the array?

Tolley answered 6/1, 2020 at 17:59 Comment(0)
T
1

Because I have access to the source of the data, we were able to modify how it outputs the JSON. We switched it to JSONL (line delineated JSON) where each line of JSON is essentially treated as it's own JSON document. I did utilize a lot of @cybersam answer, and also Michael Hunger, so thank you.

changed the source JSON to JSONL like the following:


{"access_point":4864834, "objects": [{"class_id":10, "name":"iphone", "snr":0.557461}, {"class_id":7, "name":"android", "snr":0.822390}, {"class_id":7, "name":"android", "snr":0.320850}, {"class_id":2, "name":"pc", "snr":0.915604}]}
{"access_point":4864835, "objects": [{"class_id":12, "name":"iphone", "snr":0.268736}, {"class_id":10, "name":"android", "snr":0.585927}, {"class_id":7, "name":"android", "snr":0.821383}]}

and my neo4j cypher query looked like the following:

CALL apoc.periodic.iterate(
"CALL apoc.load.jsonArray('http://13.68.174.185:8899/',null)
YIELD value AS ap",
MERGE(f:AccessPoint {id: ap.frame_id, name: ap.access_point_id})
FOREACH(obj IN frames.objects |
  MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.ap_id}))",
{batchSize: 1});
Tolley answered 8/1, 2020 at 21:7 Comment(0)
A
4

It looks like you should be using apoc.periodic.iterate instead of apoc.periodic.commit. For example:

CALL apoc.periodic.iterate(
  "CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value AS ap",
  "MERGE(f:Accesspoint {id: ap.access_point, name: ap.access_point})
   FOREACH(obj IN ap.objects |
     MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.access_point}))",
{batchSize: 10});

apoc.periodic.iterate is documented to support the batchSize option, which processes N executions of the second Cypher statement in a single transaction.

Amygdalin answered 6/1, 2020 at 19:38 Comment(3)
Thanks for the reply. This does work well on my static code, but for endless streaming code with no close "]" at the end of the root array, it essentially just spins as it attemps to do it in one batch. I've modified my static JSON to simulate thisTolley
apoc.load.json should stream the input JSON if it is a list (of maps), instead of waiting for the entire list to download before executing the second statement. Is it possible that the service supplying the JSON data is very slow?Amygdalin
Try apoc.load.jsonArrayPhotoreconnaissance
T
1

Because I have access to the source of the data, we were able to modify how it outputs the JSON. We switched it to JSONL (line delineated JSON) where each line of JSON is essentially treated as it's own JSON document. I did utilize a lot of @cybersam answer, and also Michael Hunger, so thank you.

changed the source JSON to JSONL like the following:


{"access_point":4864834, "objects": [{"class_id":10, "name":"iphone", "snr":0.557461}, {"class_id":7, "name":"android", "snr":0.822390}, {"class_id":7, "name":"android", "snr":0.320850}, {"class_id":2, "name":"pc", "snr":0.915604}]}
{"access_point":4864835, "objects": [{"class_id":12, "name":"iphone", "snr":0.268736}, {"class_id":10, "name":"android", "snr":0.585927}, {"class_id":7, "name":"android", "snr":0.821383}]}

and my neo4j cypher query looked like the following:

CALL apoc.periodic.iterate(
"CALL apoc.load.jsonArray('http://13.68.174.185:8899/',null)
YIELD value AS ap",
MERGE(f:AccessPoint {id: ap.frame_id, name: ap.access_point_id})
FOREACH(obj IN frames.objects |
  MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.ap_id}))",
{batchSize: 1});
Tolley answered 8/1, 2020 at 21:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.