/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.nukleus.internal;

import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.UnsafeAccess;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.ringbuffer.RecordDescriptor;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.agrona.concurrent.ringbuffer.RingBufferDescriptor;

public class ManyToOneRingBuffer
implements RingBuffer {
    public static final int PADDING_MSG_TYPE_ID = -1;
    private static final int INSUFFICIENT_CAPACITY = -2;
    private final int capacity;
    private final int maxMsgLength;
    private final int tailPositionIndex;
    private final int headCachePositionIndex;
    private final int headPositionIndex;
    private final int correlationIdCounterIndex;
    private final int consumerHeartbeatIndex;
    private final AtomicBuffer buffer;

    public ManyToOneRingBuffer(AtomicBuffer buffer) {
        this.buffer = buffer;
        RingBufferDescriptor.checkCapacity((int)buffer.capacity());
        this.capacity = buffer.capacity() - RingBufferDescriptor.TRAILER_LENGTH;
        buffer.verifyAlignment();
        this.maxMsgLength = this.capacity / 8;
        this.tailPositionIndex = this.capacity + RingBufferDescriptor.TAIL_POSITION_OFFSET;
        this.headCachePositionIndex = this.capacity + RingBufferDescriptor.HEAD_CACHE_POSITION_OFFSET;
        this.headPositionIndex = this.capacity + RingBufferDescriptor.HEAD_POSITION_OFFSET;
        this.correlationIdCounterIndex = this.capacity + RingBufferDescriptor.CORRELATION_COUNTER_OFFSET;
        this.consumerHeartbeatIndex = this.capacity + RingBufferDescriptor.CONSUMER_HEARTBEAT_OFFSET;
    }

    public int capacity() {
        return this.capacity;
    }

    public boolean write(int msgTypeId, DirectBuffer srcBuffer, int srcIndex, int length) {
        RecordDescriptor.checkTypeId((int)msgTypeId);
        this.checkMsgLength(length);
        boolean isSuccessful = false;
        AtomicBuffer buffer = this.buffer;
        int recordLength = length + 8;
        int recordIndex = this.claimCapacity(buffer, recordLength);
        if (-2 != recordIndex) {
            buffer.putInt(RecordDescriptor.typeOffset((int)recordIndex), msgTypeId);
            buffer.putBytes(RecordDescriptor.encodedMsgOffset((int)recordIndex), srcBuffer, srcIndex, length);
            buffer.putIntOrdered(RecordDescriptor.lengthOffset((int)recordIndex), recordLength);
            isSuccessful = true;
        }
        return isSuccessful;
    }

    public int read(MessageHandler handler) {
        return this.read(handler, Integer.MAX_VALUE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int read(MessageHandler handler, int messageCountLimit) {
        int bytesRead;
        int messagesRead = 0;
        AtomicBuffer buffer = this.buffer;
        long head = buffer.getLong(this.headPositionIndex);
        int capacity = this.capacity;
        int headIndex = (int)head & capacity - 1;
        int maxBlockLength = Math.min(capacity - headIndex, capacity >> 1);
        try {
            int recordLength;
            for (bytesRead = 0; bytesRead < maxBlockLength && messagesRead < messageCountLimit; bytesRead += BitUtil.align((int)recordLength, (int)8)) {
                int recordIndex = headIndex + bytesRead;
                recordLength = buffer.getIntVolatile(RecordDescriptor.lengthOffset((int)recordIndex));
                if (recordLength > 0) continue;
                break;
            }
        }
        finally {
            if (bytesRead != 0) {
                buffer.putLongOrdered(this.headPositionIndex, head + (long)bytesRead);
            }
        }
        return messagesRead;
    }

    public int maxMsgLength() {
        return this.maxMsgLength;
    }

    public long nextCorrelationId() {
        return this.buffer.getAndAddLong(this.correlationIdCounterIndex, 1L);
    }

    public AtomicBuffer buffer() {
        return this.buffer;
    }

    public void consumerHeartbeatTime(long time) {
        this.buffer.putLongOrdered(this.consumerHeartbeatIndex, time);
    }

    public long consumerHeartbeatTime() {
        return this.buffer.getLongVolatile(this.consumerHeartbeatIndex);
    }

    public long producerPosition() {
        return this.buffer.getLongVolatile(this.tailPositionIndex);
    }

    public long consumerPosition() {
        return this.buffer.getLongVolatile(this.headPositionIndex);
    }

    public int size() {
        long tail;
        long headBefore;
        long headAfter = this.buffer.getLongVolatile(this.headPositionIndex);
        do {
            headBefore = headAfter;
            tail = this.buffer.getLongVolatile(this.tailPositionIndex);
        } while ((headAfter = this.buffer.getLongVolatile(this.headPositionIndex)) != headBefore);
        return (int)(tail - headAfter);
    }

    public boolean unblock() {
        AtomicBuffer buffer = this.buffer;
        int mask = this.capacity - 1;
        int consumerIndex = (int)(buffer.getLongVolatile(this.headPositionIndex) & (long)mask);
        int producerIndex = (int)(buffer.getLongVolatile(this.tailPositionIndex) & (long)mask);
        if (producerIndex == consumerIndex) {
            return false;
        }
        boolean unblocked = false;
        int length = buffer.getIntVolatile(consumerIndex);
        if (length < 0) {
            buffer.putInt(RecordDescriptor.typeOffset((int)consumerIndex), -1);
            buffer.putIntOrdered(RecordDescriptor.lengthOffset((int)consumerIndex), -length);
            unblocked = true;
        } else if (0 == length) {
            int limit = producerIndex > consumerIndex ? producerIndex : this.capacity;
            int i = consumerIndex + 8;
            do {
                if (0 == (length = buffer.getIntVolatile(i))) continue;
                if (!ManyToOneRingBuffer.scanBackToConfirmStillZeroed(buffer, i, consumerIndex)) break;
                buffer.putInt(RecordDescriptor.typeOffset((int)consumerIndex), -1);
                buffer.putIntOrdered(RecordDescriptor.lengthOffset((int)consumerIndex), i - consumerIndex);
                unblocked = true;
                break;
            } while ((i += 8) < limit);
        }
        return unblocked;
    }

    private static boolean scanBackToConfirmStillZeroed(AtomicBuffer buffer, int from, int limit) {
        boolean allZeros = true;
        for (int i = from - 8; i >= limit; i -= 8) {
            if (0 == buffer.getIntVolatile(i)) continue;
            allZeros = false;
            break;
        }
        return allZeros;
    }

    private void checkMsgLength(int length) {
        if (length > this.maxMsgLength) {
            throw new IllegalArgumentException(String.format("encoded message exceeds maxMsgLength of %d, length=%d", this.maxMsgLength, length));
        }
    }

    private int claimCapacity(AtomicBuffer buffer, int recordLength) {
        int padding;
        long tail;
        int tailIndex;
        int alignedRecordLength = BitUtil.align((int)recordLength, (int)8);
        int capacity = this.capacity;
        int tailPositionIndex = this.tailPositionIndex;
        int headCachePositionIndex = this.headCachePositionIndex;
        int mask = capacity - 1;
        long requiredCapacity = alignedRecordLength + 8;
        long head = buffer.getLongVolatile(headCachePositionIndex);
        do {
            int availableCapacity;
            if (requiredCapacity > (long)(availableCapacity = capacity - (int)((tail = buffer.getLongVolatile(tailPositionIndex)) - head))) {
                head = buffer.getLongVolatile(this.headPositionIndex);
                if (requiredCapacity > (long)(capacity - (int)(tail - head))) {
                    return -2;
                }
                buffer.putLongOrdered(headCachePositionIndex, head);
            }
            padding = 0;
            tailIndex = (int)tail & mask;
            int toBufferEndLength = capacity - tailIndex;
            if (requiredCapacity <= (long)toBufferEndLength) continue;
            int headIndex = (int)head & mask;
            if (requiredCapacity > (long)headIndex) {
                head = buffer.getLongVolatile(this.headPositionIndex);
                headIndex = (int)head & mask;
                if (requiredCapacity > (long)headIndex) {
                    return -2;
                }
                buffer.putLongOrdered(headCachePositionIndex, head);
            }
            padding = toBufferEndLength;
        } while (!buffer.compareAndSetInt(RecordDescriptor.lengthOffset((int)tailIndex), 0, -recordLength));
        UnsafeAccess.UNSAFE.storeFence();
        if (0 != padding) {
            buffer.putInt(RecordDescriptor.lengthOffset((int)0), -recordLength);
            UnsafeAccess.UNSAFE.storeFence();
            buffer.putInt(RecordDescriptor.typeOffset((int)tailIndex), -1);
            buffer.putIntOrdered(RecordDescriptor.lengthOffset((int)tailIndex), padding);
            tailIndex = 0;
        }
        buffer.putInt(RecordDescriptor.lengthOffset((int)(tailIndex + alignedRecordLength)), 0);
        buffer.putLongOrdered(tailPositionIndex, tail + (long)alignedRecordLength + (long)padding);
        return tailIndex;
    }
}

