https://github.com/jianhong-li/LockFreeRingBuffer
this is my LockFreeRingBuffer with java language.
public class LockFreeRingBuffer<T> {
public static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LockFreeRingBuffer.class);
private final AtomicReferenceArray<T> buffer;
private final int bufferSize;
private final long bufferSizeMask;
private final AtomicLong writeIndex = new AtomicLong(0);
private final AtomicLong readIndex = new AtomicLong(0);
public LockFreeRingBuffer(int bufferSize) {
// Check if bufferSize is positive
if (bufferSize <= 1) {
throw new IllegalArgumentException("bufferSize must be positive");
}
// Check if bufferSize is power of 2
int zCnt = 0;
int _bufferSize = bufferSize;
while (_bufferSize > 0) {
if ((_bufferSize & 1) == 1) {
zCnt++;
}
if (zCnt > 1) {
throw new IllegalArgumentException("bufferSize must be power of 2");
}
_bufferSize = _bufferSize >> 1;
}
// Initialize buffer and bufferSize
this.buffer = new AtomicReferenceArray<>(bufferSize);
this.bufferSize = bufferSize;
this.bufferSizeMask = bufferSize - 1;
}
public int push(T value) {
// Ensure that the written data is valid
if (value == null) {
return -1;
}
long pWrite, pRead;
int loopCnt = 0;
for (; ; ) {
int _rIndex = makeIndex(pRead = readIndex.get());
int _wIndex = makeIndex(pWrite = writeIndex.get()); // push . _wIndex . Expect to read the latest version.
if (nextIndex(pWrite) == _rIndex) {
// buffer is full
return -2;
}
// Make sure that the current write pointer points to a NULL slot. That is, it can be written to. (Make sure that the take side has cleaned up the data
)
if (buffer.get(_wIndex) != null) {
if ((++loopCnt) > 16) {
logger.trace("TRACE: push data retry [01] - buffer[{}] is not null, pRead: {}, pWrite: {} readIndex:{} writeIndex:{} loopCnt:{}",
_wIndex, pRead, pWrite, readIndex.get(), writeIndex.get(), loopCnt);
Thread.yield();
}
continue;
}
// Update the pointer first, then write the value. Make sure the ownership is written correctly
if (writeIndex.compareAndSet(pWrite, pWrite + 1)) {
// Write value: Theoretically this position must be empty to write
if (buffer.compareAndSet(_wIndex, null, value)) {
// writeCnt.incrementAndGet();
return _wIndex;
}
// can not happen
throw new RuntimeException("state error");
}
}
}
public T pop() {
int loopCnt = 0;
long pRead, pWrite;
for (; ; ) {
// P_w == P_r , buffer is empty
int _rIndex = makeIndex(pRead = readIndex.get());
int _wIndex = makeIndex(pWrite = writeIndex.get());
if (_rIndex == _wIndex) {
// buffer is empty
return null;
}
T t = buffer.get(_rIndex); // There is no need to determine null here. However, it is a snapshot of pRead. So there might be a null situation.
if (t == null) {
if ((++loopCnt) > 16) {
logger.trace("TRACE: pop data retry [20] - buffer[{}] is null, pRead: {}, pWrite: {} readIndex:{} writeIndex:{} loopCnt:{}",
_rIndex, pRead, pWrite, readIndex.get(), writeIndex.get(), loopCnt);
Thread.yield();
}
continue;
}
/* ************************************************
* pWrite
* |
* v
* [] -> [] -> [] -> [] -> [] -> [] -> [] -> []
* ^
* |
* pRead
* ************************************************
* case: pRead = 1, pWrite = 1
* pWrite = pWrite + 1 = 2
* But we haven't had time to write the data yet. In this case, pRead = 1, pWrite = 2. The pRead location data is empty
.
* now, t==null will continue directly.
* after many loop,value at pRead effective finnaly. Then it is also normal to put the pRead value +1. To indicate that the ownership of pos_1 location data has been obtained.
* Then it is also normal to put the pRead value +1. To indicate that the ownership of pos_1 location data has been obtained.
*/
if (readIndex.compareAndSet(pRead, pRead + 1)) {
// pRead+1,
// Data indicating that the original pointer position can be safely manipulated. And the above is guaranteed to read a non-null value. That is, the competitive write with push is complete.
//
// set to null
boolean compareAndSet = buffer.compareAndSet(_rIndex, t, null);
// must be success if code has no bug
if (compareAndSet) {
// CAS success. t must be valid
return t;
}
logger.error("ERROR: pop_data_error - set to null failed, pRead: {} ({}) , pWrite: {} ({})readIndex:{} writeIndex:{}",
pRead, _rIndex, pWrite, _wIndex, readIndex.get(), writeIndex.get());
// can not happen
throw new RuntimeException("state error");
}
}
}
/**
* this function maybe inline by JIT.
*/
private int nextIndex(long currentIndex) {
return (int) ((currentIndex + 1) & bufferSizeMask);
}
/**
* this function maybe inline by JIT.
*/
private int makeIndex(long currentIndex) {
return (int) (currentIndex & bufferSizeMask);
}
// ======================== get / setter =======================
public long getReadCnt() {
return readIndex.get();
}
public long getWriteCnt() {
return writeIndex.get();
}
}