/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.spark.streaming;

import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.spark.streaming.MessageRetryManager;
import org.apache.rocketmq.spark.streaming.MessageSet;

public class DefaultMessageRetryManager
implements MessageRetryManager {
    private Map<String, MessageSet> cache = new ConcurrentHashMap<String, MessageSet>(500);
    private BlockingQueue<MessageSet> queue;
    private int maxRetry;
    private int ttl;

    public DefaultMessageRetryManager(BlockingQueue<MessageSet> queue, int maxRetry, final int ttl) {
        this.queue = queue;
        this.maxRetry = maxRetry;
        this.ttl = ttl;
        long period = 5000L;
        new Timer().scheduleAtFixedRate(new TimerTask(){

            @Override
            public void run() {
                long now = System.currentTimeMillis();
                for (Map.Entry entry : DefaultMessageRetryManager.this.cache.entrySet()) {
                    String id = (String)entry.getKey();
                    MessageSet messageSet = (MessageSet)entry.getValue();
                    if (now - messageSet.getTimestamp() < (long)ttl) continue;
                    DefaultMessageRetryManager.this.fail(id);
                }
            }
        }, period, period);
    }

    @Override
    public void ack(String id) {
        this.cache.remove(id);
    }

    @Override
    public void fail(String id) {
        MessageSet messageSet = this.cache.remove(id);
        if (messageSet == null) {
            return;
        }
        if (this.needRetry(messageSet)) {
            messageSet.setRetries(messageSet.getRetries() + 1);
            messageSet.setTimestamp(0L);
            try {
                this.queue.put(messageSet);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    @Override
    public void mark(MessageSet messageSet) {
        messageSet.setTimestamp(System.currentTimeMillis());
        this.cache.put(messageSet.getId(), messageSet);
    }

    @Override
    public boolean needRetry(MessageSet messageSet) {
        return messageSet.getRetries() < this.maxRetry;
    }

    public void setCache(Map<String, MessageSet> cache) {
        this.cache = cache;
    }
}

