/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.rx;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.functions.LongConsumer;
import io.reactivex.rxjava3.processors.ReplayProcessor;
import org.reactivestreams.Publisher;
import org.redisson.RedissonTransferQueue;
import org.redisson.api.RFuture;
import org.redisson.rx.ElementsStream;
import org.redisson.rx.PublisherAdder;

public class RedissonTransferQueueRx<V> {
    private final RedissonTransferQueue<V> queue;

    public RedissonTransferQueueRx(RedissonTransferQueue<V> queue) {
        this.queue = queue;
    }

    public Flowable<V> takeElements() {
        return ElementsStream.takeElements(this.queue::takeAsync);
    }

    public Publisher<V> iterator() {
        final ReplayProcessor p = ReplayProcessor.create();
        return p.doOnRequest(new LongConsumer(){
            private int currentIndex = 0;

            public void accept(long n) throws Exception {
                RedissonTransferQueueRx.this.queue.getValueAsync(this.currentIndex).onComplete((value, e) -> {
                    if (e != null) {
                        p.onError(e);
                        return;
                    }
                    if (value != null) {
                        p.onNext(value);
                        ++this.currentIndex;
                    }
                    if (value == null) {
                        p.onComplete();
                        return;
                    }
                    if (n - 1L == 0L) {
                        return;
                    }
                    try {
                        this.accept(n - 1L);
                    }
                    catch (Exception e1) {
                        e1.printStackTrace();
                    }
                });
            }
        });
    }

    public Single<Boolean> addAll(Publisher<? extends V> c) {
        return new PublisherAdder<V>(){

            @Override
            public RFuture<Boolean> add(Object o) {
                return RedissonTransferQueueRx.this.queue.addAsync(o);
            }
        }.addAll(c);
    }
}

