Update singleton HashMap using Google pub/sub
Asked Answered
H

1

16

I have a use case where I initialise a HashMap that contains a set of lookup data (information about the physical location etc. of IoT devices). This lookup data serves as reference data for a 2nd dataset which is a PCollection. This PCollection is a dataflow that provides the data that the IoT devices record. The dataflow from the IoT devices uses an Apache Beam pipeline that runs as a Google Dataflow utilising Google Cloud pub/sub.

When I process the PCollection (the device data), I link the Google Cloud pub/sub data to the related lookup entry in the HashMap.

I need to update the HashMap, based on a 2nd pub/sub that pushes changes to its data. Here's how I'm getting a PCollection so far and doing a lookup using the HashMap:

HashMap -> contains pre-loaded lookup data (information about the IoT devices)

PCollection -> contains data from a pipeline dataflow (the data recorded by the IoT devices)

I'm generating a HashMap for the IoT device lookup data as a singleton:

public class MyData {

    private static final MyData instance = new MyData ();

    private MyData () {     
            HashMap myDataMap = new HashMap<String, String>();          
               ... logic to populate the map

            this.referenceData = myDataMap;

    }

    public HashMap<Integer, DeviceReference> referenceData;

    public static DeviceData getInstance(){
        return instance;
    }
}

I then use the HashMap in a different class where I'm subscribing to updates to the data (these are messages that e.g. give me new data that relates to the entities already stored in the HashMap). I'm subscribing to changes using a Google pub/sub with Apache beam:

HashMap<String, String> referenceData = MyData.getInstance().referenceData;

Pipeline pipeLine = Pipeline.create(options);           

// subscribe to changes in data

org.apache.beam.sdk.values.PCollection myDataUpdates;

myDataUpdates = pipeLine.begin()
    .apply(String.format("Subscribe to data updates"),
        PubsubIO.readStrings().fromTopic(
                String.format("myPubSubPath")));

What I want to do is to efficiently apply the data updates to the singleton HashMap (i.e. manipulate the HashMap based on my data subscription). How can I do this?

I have a limited understanding of Apache Beam and I only know how to do transforms on pipeline data to create another, separate PCollection. I think that this is the point of Beam, that it is for transforming large data sets into a different form. Is there a way of achieving what I need (updating a dataset based on a pub/sub subscription) using Apache Beam, or is there another way I can update the HashMap using pub/sub? (I can't poll for the data as it creates too much latency and cost, I need to update the HashMap using a subscription).


The Google cloud docs show a way of directly subscribing to a Google Cloud pub/sub that isn't linked to an Apache Beam pipeline. This is promising as a potential solution, and relies on the following Maven dependency:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>1.53.0</version>
</dependency>

I'm getting a conflict though, which is a conflict with the following Maven dependencies for Apache Beam:

<dependency>
  <groupId>com.google.cloud.dataflow</groupId>
  <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
  <version>2.5.0</version>
</dependency>

The issue is documented in a separate question here - Maven conflict in Java app with google-cloud-core-grpc dependency. From what I'm seeing it seems that it doesn't matter which version of the google-cloud-pubsub Maven artifact I use, as from what I've figured out it looks like the v.2.5.0 beam dependency and below will always conflict with any current version of the google dependency.

(I've raised this as an issue in the Beam Jira - https://issues.apache.org/jira/browse/BEAM-6118)


I'm currently investigating side inputs and combine as a way to achieve updating of the HashMap:

https://www.programcreek.com/java-api-examples/?api=org.apache.beam.sdk.transforms.Combine

Example 10 shows a way that .getSideInputsMap() can be applied to a payload. I'm wondering if I can apply this somehow to my subscription to the lookup data changes. If I get a PCollection like this, I can't directly chain .getSideInputsMap() to the PCollection

deviceReferenceDataUpdates = pipeLine.begin()
    .apply("Get changes to the IoT device lookup data"),
         PubsubIO.readMessages().fromTopic("IoT device lookup data")).

I've asked a separate question specifically about how I might be able to use .getSideInputsMap() - Apache Beam - how can I apply .getSideInputsMap to a subscription to a Google pub/sub?

Hearsh answered 21/11, 2018 at 23:34 Comment(10)
I'm a bit confused by this question. Cloud pub/sub is generally used for queuing messages in a distributed system, but a singleton HashMap is the opposite of a distributed system. Do you have a distributed system or don't you? If you want to scale to a distributed solution, you are probably better off abandoning the idea of a single HashMap, and switch to some kind of string-keyed object storage instead.Charade
Hi @Daniel. Thanks, you prompted me to better explain the question. I've added an intro to the OP to help clarify my use case. I'm not really understanding the benefit of having string-keyed object storage in another part of the system. Now that I've better explained the use case does that change your assessment? I suspect that you've maybe highlighted a gap in my understanding of best-practice in handling of distributed data. Any references you can point me to are much appreciated :)Hearsh
I still think you're fixated on the idea of a system where you can keep important state in a single in-memory data structure (HashMap in this case). If you are dealing with a distributed system, you shouldn't assume that you have only one of anything. Rather than trying to listen to all mutations everywhere and update a single process's memory on a single machine, you want a distributed object storage system that would serve the same purpose as your HashMap. You tagged this question "bigdata", but if you can fit all your data in memory on one machine, then you don't actually have Big Data.Charade
Thanks @Daniel. I'm relatively new to Big Data and distributed system considerations. The idea of the HashMap is to enable rapid lookup of reference data that's related to pipeline data. The reference data may be tens of thousands of records at most and I need to rapidly link it to the pipeline data. I can't poll for the reference data each time pipeline data is processed because of Google Cloud platform cost constraint and scalablity concerns, so I'm not clear what you're suggesting as an approach. I'm subscribing to the reference data updates via Google Cloud Platform pub/sub.Hearsh
@Daniel, the pipeline data is reading potentially billions of rows of data - that's the Big Data part. The lookup data is guaranteed to be available on the primary application thread, which is where the linking will take place. Apache Beam can distribute the pipeline data processing across multiple CPUs and machines however the pipeline data will then be linked to the reference (lookup) data on the main thread.Hearsh
Sorry, this is the part that makes no sense to me: you say your processing of your pipeline will be distributed across multiple physical machines, and yet that processing will be "linked" to the reference data on "the main thread". Where is that main thread, and what kind of linkage are you proposing? Or is your question specifically about how to create that kind of linkage?Charade
For what it's worth: if you have billions of records that all need to participate in mutations to a single chuck of data that's small enough to fit in memory, then I still feel like "Big Data" doesn't apply here. You don't need some fancy-pants distributed system. Just write a single-process program that chews through the billion records and modifies the in-memory hashtable as it goes. A few billion records really isn't that big; even if every record is one kilobyte of data, you still only have a few terabytes total, and that's easily handled by a single physical machine.Charade
Thanks @Daniel. The use case is distributed environmental data that needs to be captured in real time across a broad geographic spread, for historic reporting purposes. I'm working on transforming and storing the initial data. The single-source of mutation is doing a lot less than the transforms that are occurring on the IoT device data, so the current thinking is to distribute the initial read and transformation load, but perform the final mutation for each data record on a single instance by linking the transformed data to the 'device' data (data about the IoT device reading the data).Hearsh
Take into account that a plain Java HashMap is not thread-safe. Any attempt of concurrent modification is doomed up-front. Even if you use a ConcurrentHashMap (or any other type of synchronization) you face thread contention performance problem because memory is faster than disk but still can serve only a limited number of clients. As suggested in another comments, I would first consider the possibility of a fully distributed system. Why do you need a singleton Map? Each data collector will probably read data only from a limited number of devices, right? Can you consolidate that data later?Inevasible
@SergMTen, if you make your comment an answer I'll award the bounty for it as it's a useful suggestion. I don't have much time left to award the bounty.Hearsh
H
1

I found a way of doing this within the Apache Beam framework, as follows (not fully tested).

Note - take into account the comment on the OP from @Serg M Ten that a better approach may be to consolidate the data later, instead of trying to join the lookup data as part of the transformation processing.


Singleton HashMap

See my answer here - Accessing a HashMap from a different class


Pipeline (on single thread, implemented in main)

// initialise singleton HashMap containing lookup data on bootstrap:
LookupData lookupData = LookupData.getInstance();

org.apache.beam.sdk.values.PCollection lookupDataUpdateMessage;

lookupDataUpdateMessage = pipeLine.begin()
                              .apply("Extract lookup update data", PubsubIO.readStrings().fromTopic("myLookupUpdatePubSubTopic"))
                              .apply("Transform lookup update data",
                                 ParDo.of(new TransformLookupData.TransformFn()));

                     org.apache.beam.sdk.values.PCollection lookupDataMessage;

Transform

import java.io.Serializable;

import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.DoFn;
import org.json.JSONObject;

import myLookupSingletonClass;
import myLookupUpObjectClass;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Strings;


public class TransformDeviceMeta

    public static class TransformFn extends DoFn<String, MyLookupData> {

        @ProcessElement
        public void processElement(ProcessContext c) 
        {   
            LookupData lookupData = LookupData.getInstance();

            MyLookupData myLookupDataUpdate = new MyLookupData();

            try 
            {           
                byte[] payload = c.element().getBytes();
                String myLookUpDataJson = new JSONObject(new String(payload)).toString();

                ObjectMapper mapper = new ObjectMapper();
                myLookUpDataUpdate = mapper.readValue(myLookUpDataJson , MyLookupData.class);

                String updatedLookupDataId = updatedLookupDataId.id;

                // logic for HashMap updating e.g:

                    lookupData.myHashMap.remove(updatedDeviceId);
                }
                else {
                    lookupData.myHashMap.put(updatedDeviceId, deviceMetaUpdate);    
                }
            } 
            catch (Exception ex) {
                Log.error(ex.getMessage());
                System.out.println("Error " + ex.getMessage());
            }
        }       
    }   
}

MyLookupData = Class that forms the model for the lookup data

Hearsh answered 30/11, 2018 at 1:13 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.