I am working on a project, where I need to consume a lot of records and then I am sending these records to some other system, which uses ZeroMQ.
Here is the flow:
- Store all the incoming records in a CHM from multiple threads. Records will come at very high speed.
- From a background thread, which runs every 1 minute, send these records from CHM to ZeroMQ servers.
- After sending each record to ZeroMQ servers, add them to a retry bucket as well, so that it can be retried after a particular time passes, if an acknowledgment is not yet received for this record.
- We also have a poller runnable thread, which receives an acknowledgment from ZeroMQ servers, that tells these records have been received, so once I get an acknowledgment back, I delete that record from the retry bucket, so that it doesn't get retried.
- Even if some records are sent twice, it's ok, but it is good to minimize this.
I am not sure what is the best way to minimize this in my below scenario.
Below is my Processor
class in which an .add()
method will be called by multiple threads to populate dataHolderByPartitionReference
CHM in a thread safe way. And then, in the constructor of Processor
class, I start the background thread, which runs every 30 seconds, to push records from the same CHM to a set of ZeroMQ servers, by calling SendToZeroMQ
class as shown below:
Processor
public class Processor {
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private final AtomicReference<ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference =
new AtomicReference<>(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>());
private static class Holder {
private static final Processor INSTANCE = new Processor();
}
public static Processor getInstance() {
return Holder.INSTANCE;
}
private Processor() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
validateAndSendAllPartitions(dataHolderByPartitionReference
.getAndSet(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>()));
}
}, 0, 30, TimeUnit.SECONDS);
}
private void validateAndSendAllPartitions(
ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition) {
// calling validateAndSend in parallel for each partition (which is map key)
// generally there will be only 5-6 unique partitions max
}
private void validateAndSend(final int partition,
final ConcurrentLinkedQueue<DataHolder> dataHolders) {
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
while (!dataHolders.isEmpty()) {
.........
.........
SendToZeroMQ.getInstance().executeAsync(partition, clientKeyBytesAndProcessBytesHolder);
}
// calling again with remaining values
SendToZeroMQ.getInstance().executeAsync(partition, clientKeyBytesAndProcessBytesHolder);
}
// called by multiple threads to populate dataHolderByPartitionReference CHM
public void add(final int partition, final DataHolder holder) {
// store records in dataHolderByPartitionReference in a thread safe way
}
}
And below is my SendToZeroMQ
class, which sends a record to a set of ZeroMQ servers and also retries accordingly, depending on an acknowledgment delivery.
- Firstly it will send a record to ZeroMQ servers.
- Then it will add a same record to retryBucket, which will get retried later on, depending on whether an acknowledgment was received or not.
- In the same class, I start a background thread, which runs every 1 minute to send records again, which are still in a retry bucket.
- Same class also starts
ResponsePoller
thread, which will keep running forever, to see what records have been acknowledged ( which we have sent before ), so as soon as records are acknowledged, theResponsePoller
thread will remove those records from retryBucket, so that these do not get retried.
SendToZeroMQ
public class SendToZeroMQ {
// do I need these two ScheduledExecutorService or one is sufficient to start my both the thread?
private final ScheduledExecutorService executorServicePoller = Executors
.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private final Cache<Long, byte[]> retryBucket = CacheBuilder.newBuilder().maximumSize(10000000)
.removalListener(RemovalListeners.asynchronous(new CustomListener(), executorService))
.build();
private static class Holder {
private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
}
public static SendToZeroMQ getInstance() {
return Holder.INSTANCE;
}
private SendToZeroMQ() {
executorServicePoller.submit(new ResponsePoller());
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Entry<Long, byte[]> entry : retryBucket.asMap().entrySet()) {
executeAsync(entry.getKey(), entry.getValue());
}
}
}, 0, 1, TimeUnit.MINUTES);
}
public boolean executeAsync(final long address, final byte[] encodedByteArray) {
Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
if (!liveSockets.isPresent()) {
return false;
}
return executeAsync(address, encodedByteArray, liveSockets.get().getSocket());
}
public boolean executeAsync(final long address, final byte[] encodedByteArray, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedByteArray);
boolean sent = msg.send(socket);
msg.destroy();
// add to retry bucket
retryBucket.put(address, encodedByteArray);
return sent;
}
public boolean executeAsync(final int partition,
final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
if (!liveSockets.isPresent()) {
return false;
}
Map<Long, byte[]> addressToencodedByteArray = encode(partition, clientKeyBytesAndProcessBytesHolder);
long address = addressToencodedByteArray.entrySet().iterator().next().getKey();
byte[] encodedByteArray = addressToencodedByteArray.entrySet().iterator().next().getValue();
return executeAsync(address, encodedByteArray, liveSockets.get().getSocket());
}
private Map<Long, byte[]> encode(final int partition,
final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
// this address will be unique always
long address = TestUtils.getAddress();
Frame frame = new Frame(............);
byte[] packedByteArray = frame.serialize();
// this map will always have one entry in it.
return ImmutableMap.of(address, packedByteArray);
}
public void removeFromRetryBucket(final long address) {
retryBucket.invalidate(address);
}
}
And below is my ResponsePoller
class, which waits for the acknowledgment for all those records, that 've been already sent by the other background thread. If an acknowledgement is received, then delete it from the retry bucket, so that it doesn't get retried.
public class ResponsePoller implements Runnable {
private static final Random random = new Random();
private static final int listenerPort = 8076;
@Override
public void run() {
ZContext ctx = new ZContext();
Socket client = ctx.createSocket(ZMQ.PULL);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.bind("tcp://" + TestUtils.getIPAddress() + ":" + listenerPort);
PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};
while (!Thread.currentThread().isInterrupted()) {
// Tick once per second, pulling in arriving messages
for (int centitick = 0; centitick < 100; centitick++) {
ZMQ.poll(items, 10);
if (items[0].isReadable()) {
ZMsg msg = ZMsg.recvMsg(client);
Iterator<ZFrame> it = msg.iterator();
while (it.hasNext()) {
ZFrame frame = it.next();
try {
long address = TestUtils.getAddress(frame.getData());
// remove from retry bucket since we got the acknowledgment for this record
SendToZeroMQ.getInstance().removeFromRetryBucket(address);
} catch (Exception ex) {
// log error
} finally {
frame.destroy();
}
}
msg.destroy();
}
}
}
ctx.destroy();
}
}
Question:
I am trying to see, from the design perspective, what is the best way to design this problem, so that all my logic works seamlessly?
I am pretty sure there is a better way to design this problem as compared to what I have - what that better way could be?
zeromq
queue servers. – Odeen