How to design a system which sends records and retries sending them again, if an acknowledgement is not receieved? [closed]
Asked Answered
O

3

9

I am working on a project, where I need to consume a lot of records and then I am sending these records to some other system, which uses ZeroMQ.

Here is the flow:

  • Store all the incoming records in a CHM from multiple threads. Records will come at very high speed.
  • From a background thread, which runs every 1 minute, send these records from CHM to ZeroMQ servers.
  • After sending each record to ZeroMQ servers, add them to a retry bucket as well, so that it can be retried after a particular time passes, if an acknowledgment is not yet received for this record.
  • We also have a poller runnable thread, which receives an acknowledgment from ZeroMQ servers, that tells these records have been received, so once I get an acknowledgment back, I delete that record from the retry bucket, so that it doesn't get retried.
  • Even if some records are sent twice, it's ok, but it is good to minimize this.

I am not sure what is the best way to minimize this in my below scenario.

Below is my Processor class in which an .add() method will be called by multiple threads to populate dataHolderByPartitionReference CHM in a thread safe way. And then, in the constructor of Processor class, I start the background thread, which runs every 30 seconds, to push records from the same CHM to a set of ZeroMQ servers, by calling SendToZeroMQ class as shown below:


Processor

public class Processor {
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final AtomicReference<ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference =
      new AtomicReference<>(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>());

  private static class Holder {
    private static final Processor INSTANCE = new Processor();
  }

  public static Processor getInstance() {
    return Holder.INSTANCE;
  }

  private Processor() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        validateAndSendAllPartitions(dataHolderByPartitionReference
            .getAndSet(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>()));
      }
    }, 0, 30, TimeUnit.SECONDS);
  }

  private void validateAndSendAllPartitions(
      ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition) {
        // calling validateAndSend in parallel for each partition (which is map key)
        // generally there will be only 5-6 unique partitions max
  }

  private void validateAndSend(final int partition,
      final ConcurrentLinkedQueue<DataHolder> dataHolders) {
    Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;
    while (!dataHolders.isEmpty()) {
        .........
        .........
        SendToZeroMQ.getInstance().executeAsync(partition, clientKeyBytesAndProcessBytesHolder);
    }
    // calling again with remaining values
    SendToZeroMQ.getInstance().executeAsync(partition, clientKeyBytesAndProcessBytesHolder);
  }

  // called by multiple threads to populate dataHolderByPartitionReference CHM
  public void add(final int partition, final DataHolder holder) {
    // store records in dataHolderByPartitionReference in a thread safe way
  }
}

And below is my SendToZeroMQ class, which sends a record to a set of ZeroMQ servers and also retries accordingly, depending on an acknowledgment delivery.

  • Firstly it will send a record to ZeroMQ servers.
  • Then it will add a same record to retryBucket, which will get retried later on, depending on whether an acknowledgment was received or not.
  • In the same class, I start a background thread, which runs every 1 minute to send records again, which are still in a retry bucket.
  • Same class also starts ResponsePoller thread, which will keep running forever, to see what records have been acknowledged ( which we have sent before ), so as soon as records are acknowledged, the ResponsePoller thread will remove those records from retryBucket, so that these do not get retried.

SendToZeroMQ

public class SendToZeroMQ {
  // do I need these two ScheduledExecutorService or one is sufficient to start my both the thread?
  private final ScheduledExecutorService executorServicePoller = Executors
      .newSingleThreadScheduledExecutor();
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final Cache<Long, byte[]> retryBucket = CacheBuilder.newBuilder().maximumSize(10000000)
      .removalListener(RemovalListeners.asynchronous(new CustomListener(), executorService))
      .build();

  private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
  }

  public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;
  }

  private SendToZeroMQ() {
    executorServicePoller.submit(new ResponsePoller());
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        for (Entry<Long, byte[]> entry : retryBucket.asMap().entrySet()) {
          executeAsync(entry.getKey(), entry.getValue());
        }
      }
    }, 0, 1, TimeUnit.MINUTES);
  }

  public boolean executeAsync(final long address, final byte[] encodedByteArray) {
    Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }
    return executeAsync(address, encodedByteArray, liveSockets.get().getSocket());
  }

  public boolean executeAsync(final long address, final byte[] encodedByteArray, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedByteArray);
    boolean sent = msg.send(socket);
    msg.destroy();
    // add to retry bucket
    retryBucket.put(address, encodedByteArray);
    return sent;
  }

  public boolean executeAsync(final int partition,
      final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
    Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }         
    Map<Long, byte[]> addressToencodedByteArray = encode(partition, clientKeyBytesAndProcessBytesHolder);
    long address = addressToencodedByteArray.entrySet().iterator().next().getKey();
    byte[] encodedByteArray = addressToencodedByteArray.entrySet().iterator().next().getValue();
    return executeAsync(address, encodedByteArray, liveSockets.get().getSocket());
  }

  private Map<Long, byte[]> encode(final int partition,
      final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {

    // this address will be unique always
    long address = TestUtils.getAddress();
    Frame frame = new Frame(............);
    byte[] packedByteArray = frame.serialize();
    // this map will always have one entry in it.
    return ImmutableMap.of(address, packedByteArray);
  }

  public void removeFromRetryBucket(final long address) {
    retryBucket.invalidate(address);
  }
}

And below is my ResponsePoller class, which waits for the acknowledgment for all those records, that 've been already sent by the other background thread. If an acknowledgement is received, then delete it from the retry bucket, so that it doesn't get retried.

public class ResponsePoller implements Runnable {
  private static final Random random = new Random();
  private static final int listenerPort = 8076;

  @Override
  public void run() {
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PULL);

    // Set random identity to make tracing easier
    String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
    client.setIdentity(identity.getBytes(ZMQ.CHARSET));
    client.bind("tcp://" + TestUtils.getIPAddress() + ":" + listenerPort);

    PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};

    while (!Thread.currentThread().isInterrupted()) {
      // Tick once per second, pulling in arriving messages
      for (int centitick = 0; centitick < 100; centitick++) {
        ZMQ.poll(items, 10);
        if (items[0].isReadable()) {
          ZMsg msg = ZMsg.recvMsg(client);
          Iterator<ZFrame> it = msg.iterator();
          while (it.hasNext()) {
            ZFrame frame = it.next();
            try {
              long address = TestUtils.getAddress(frame.getData());
              // remove from retry bucket since we got the acknowledgment for this record
              SendToZeroMQ.getInstance().removeFromRetryBucket(address);
            } catch (Exception ex) {
              // log error
            } finally {
              frame.destroy();
            }
          }
          msg.destroy();
        }
      }
    }
    ctx.destroy();
  }
}

Question:

  • I am trying to see, from the design perspective, what is the best way to design this problem, so that all my logic works seamlessly?

  • I am pretty sure there is a better way to design this problem as compared to what I have - what that better way could be?

Odeen answered 26/1, 2017 at 4:21 Comment(10)
If the question is "Can everything be done in one single class?", the answer is no. I don't even have to look at any code or anything else to answer that. :)Doubletree
Yeah sorry that's not my question. :) I shouldn't even have mentioned that. Mainly I am trying to see best way to design the system for this kind of problem. Idea is very simple but looks like I have complicated all the things.Odeen
@david Why do you want to reinvent the wheel? Use a reliable message queue system like RabbitMQ which have already implemented what you wantIndiscernible
Actually RabbitMQ might not be suitable for my problem here at all. I am already consuming records from kafka and then I have to send these records to some other system which uses zeromq. So all these logics I have to implement in my code. In my question by database I meant zeromq queue servers.Odeen
@david Are you open to a high level design or do you want someone to help you refactor your code? The later would make this question too broad for SO format (in my opinion)Hooks
@CKing I am looking for high level design mostly as you can see in one of my classes I didn't showed the full code. Just trying to see a design where my code would fit seamlessly without any duplication. If anyone can help me provide some template which explains what this class or method is doing, then mostly I will be able to fill things in.Odeen
@david So you want us to explain the code to you and then explain the solution and where your solution fits in?Hooks
@CKing hmm I think so.. If I can understand it then I will definitely be able to put it right.Odeen
This kind of question is too broad for this site, but you could try asking it on softwareengineering.stackexchange.comThekla
@david Note: If you do ask this on SE, make sure your question has a narrow focus. If you ask with the same broadness as you did here, it will be closed there too. In necessary, break your question up into multiple ones, each asking about one thing that can have one answer.Thekla
V
2

In my opinion, it should not be your concern to worry about the data reception acknowledgment at the 'Application Layer' as long as you are using TCP for the underlying communication.

In this case - as ZeroMQ was built on top of TCP itself, with further optimizations, you need not worry about successful data transfer, as long as there is no exception at the transport layer ( which obviously is bounced back to you for handling the case ).

The way I see your problem is that - you are running Kafka consumer threads, which will receive and bounce back messages to another message queue ( in this case is ZMQ, which is using TCP and guarantees successful message delivery, or throws the exception at lower layers of communication ).

The simplest solution I could think up of is to use a thread pool, from within each consumer and try to send the message using ZMQ. In any case of a network error, you could easily pool that message for later consumption or logging as long as your application daemon is running.

In the proposed solution I am assuming that the order of messages is not in the problem space. And you are not looking at complicating things.

Vacuity answered 2/2, 2017 at 23:8 Comment(4)
Yes I agree that we shouldn't worry about acknowledgment since it uses TCP underlying but we have to make sure that we are not loosing any data and that's why we need this.. Yeah order of message is not important.Odeen
To avoid losing data as I suggested you could add a backend store for network failed messaged from within your consumer application. Ideally, there is no guarantee that the store will not get corrupted because of hardware failure, but yes as I suggested you can log and reprocess since the order is not important. Honestly, you can not design a 100% fault proof system for inter-process communication as there is always intermediate hardware involved. The best approach is to handle exceptions appropriately.Vacuity
With all due respect, let me remind one of the key outputs, received from the Theoretical Cybernetics, which has made a fair claim, that it is always possible to create a 100% reliable system, based on un-reliable components, given a system is using well defined interconnections that allow for sufficiently continuous self-diagnostics, self-identification and self-corrections.Guthry
that is exactly handling error conditions (unreliable components)Vacuity
T
2

I am trying to see, from the design perspective, what is the best way to design this problem, so that all my logic works seamlessly?

I am pretty sure there is a better way to design this problem as compared to what I have - what that better way could be?

I was trying to implement something similar, but reading from spark kafka and posting to another kafka topic. A few things that have helped me along were:

1) Used a strategy pattern to implement various exception handling strategies. I took the inspiration from zookeeper, which has various retry strategies like, RetryNTimes, ExponentialBackOff, Retry With Interval etc.

2) Each of these strategies are used in different contexts. In the sense, I had to post my data to a variety of locations and the exceptions could range from a bad request being sent to non availability of network. In the worst scenarios, where network retry has failed N times, I saved them to cassandra database, with appropriate messages, and a cron / manual process can then retry or replay the requests by posting to another kafka topic. A good caching strategy should have done it, but we needed the data for further analytics as well. Hence the persistence.

3) I prefer to not write extensive multithreading code but rather try to hand it off to the framework, to care of it for me. After a few years of facing nasty bugs in multithreading ( I am no expert in this area ), I started favouring frameworks like akka to handle the multithreading part for me.

Triplenerved answered 3/2, 2017 at 12:4 Comment(0)
K
2

I think your situation is a perfect candidate for "Saga" design pattern (Sagas by Hector Garcia-Molina and Kenneth Salem).

Basically, you have a long-running business transaction, which consists of several time sends (retries), until the status changes to acknowledged. Express this flow as an entity of its own (Saga), which has method to execute the retry, as well as method to acknowledge the receipt. Once acknowledged, it should not execute the retry anymore.

How you store and handle the saga, does not really matter, and has no direct impact on the pattern itself. You can use any technology that executes on interval-basis, retrieves all sagas that are not yet acknowledged, executes them, and saves them. You should also have the acknowledge receiver endpoint, that retrieves the saga, marks it as acknowledged, and then saves it.

Many message brokers and service buses have retry capabilities within. You can use what you already have (if it has retry capabilities), or you can use any others that have it. Or, as I said before, you can simply execute sagas from your own application on interval-basis.

Kraus answered 3/2, 2017 at 18:22 Comment(1)
Your idea is interesting but your answer is not comprehensive enough for the op may implement it in its actual code.Grater

© 2022 - 2024 — McMap. All rights reserved.