/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.pubsub;

import com.lambdaworks.redis.RedisReactiveCommandsImpl;
import com.lambdaworks.redis.api.rx.Success;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.pubsub.PubSubCommandBuilder;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubListener;
import com.lambdaworks.redis.pubsub.StatefulRedisPubSubConnection;
import com.lambdaworks.redis.pubsub.api.rx.ChannelMessage;
import com.lambdaworks.redis.pubsub.api.rx.PatternMessage;
import com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands;
import java.util.Map;
import rx.Observable;
import rx.Subscriber;

public class RedisPubSubReactiveCommandsImpl<K, V>
extends RedisReactiveCommandsImpl<K, V>
implements RedisPubSubReactiveCommands<K, V> {
    private PubSubCommandBuilder<K, V> commandBuilder;

    public RedisPubSubReactiveCommandsImpl(StatefulRedisPubSubConnection<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
        this.connection = connection;
        this.commandBuilder = new PubSubCommandBuilder<K, V>(codec);
    }

    @Override
    public void addListener(RedisPubSubListener<K, V> listener) {
        this.getStatefulConnection().addListener(listener);
    }

    @Override
    public Observable<PatternMessage<K, V>> observePatterns() {
        SubscriptionPubSubListener listener = new SubscriptionPubSubListener<K, V, PatternMessage<K, V>>(){

            @Override
            public void message(K pattern, K channel, V message) {
                if (this.subscriber == null) {
                    return;
                }
                if (this.subscriber.isUnsubscribed()) {
                    this.subscriber.onCompleted();
                    RedisPubSubReactiveCommandsImpl.this.removeListener(this);
                    this.subscriber = null;
                    return;
                }
                this.subscriber.onNext(new PatternMessage(pattern, channel, message));
            }
        };
        return Observable.create(new PubSubObservable(listener));
    }

    @Override
    public Observable<ChannelMessage<K, V>> observeChannels() {
        SubscriptionPubSubListener listener = new SubscriptionPubSubListener<K, V, ChannelMessage<K, V>>(){

            @Override
            public void message(K channel, V message) {
                if (this.subscriber == null) {
                    return;
                }
                if (this.subscriber.isUnsubscribed()) {
                    this.subscriber.onCompleted();
                    RedisPubSubReactiveCommandsImpl.this.removeListener(this);
                    this.subscriber = null;
                    return;
                }
                this.subscriber.onNext(new ChannelMessage(channel, message));
            }
        };
        return Observable.create(new PubSubObservable(listener));
    }

    @Override
    public void removeListener(RedisPubSubListener<K, V> listener) {
        this.getStatefulConnection().removeListener(listener);
    }

    @Override
    public Observable<Success> psubscribe(K ... patterns) {
        return this.getSuccessObservable(this.createObservable(() -> this.commandBuilder.psubscribe(patterns)));
    }

    @Override
    public Observable<Success> punsubscribe(K ... patterns) {
        return this.getSuccessObservable(this.createObservable(() -> this.commandBuilder.punsubscribe(patterns)));
    }

    @Override
    public Observable<Success> subscribe(K ... channels) {
        return this.getSuccessObservable(this.createObservable(() -> this.commandBuilder.subscribe(channels)));
    }

    @Override
    public Observable<Success> unsubscribe(K ... channels) {
        return this.getSuccessObservable(this.createObservable(() -> this.commandBuilder.unsubscribe(channels)));
    }

    @Override
    public Observable<Long> publish(K channel, V message) {
        return this.createObservable(() -> this.commandBuilder.publish(channel, message));
    }

    @Override
    public Observable<K> pubsubChannels(K channel) {
        return (Observable)this.createDissolvingObservable(() -> this.commandBuilder.pubsubChannels(channel));
    }

    @Override
    public Observable<Map<K, Long>> pubsubNumsub(K ... channels) {
        return this.createObservable(() -> this.commandBuilder.pubsubNumsub(channels));
    }

    @Override
    public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
        return (StatefulRedisPubSubConnection)super.getStatefulConnection();
    }

    private static class SubscriptionPubSubListener<K, V, T>
    extends RedisPubSubAdapter<K, V> {
        protected Subscriber<? super T> subscriber;

        private SubscriptionPubSubListener() {
        }

        public void activate(Subscriber<? super T> subscriber) {
            this.subscriber = subscriber;
        }
    }

    private class PubSubObservable<T>
    implements Observable.OnSubscribe<T> {
        private SubscriptionPubSubListener<K, V, T> listener;

        public PubSubObservable(SubscriptionPubSubListener<K, V, T> listener) {
            this.listener = listener;
        }

        public void call(Subscriber<? super T> subscriber) {
            this.listener.activate(subscriber);
            subscriber.onStart();
            RedisPubSubReactiveCommandsImpl.this.addListener(this.listener);
        }
    }
}

