/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols;

import java.io.DataInput;
import java.io.DataOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.TimeoutException;
import org.jgroups.View;
import org.jgroups.annotations.Experimental;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.AckCollector;
import org.jgroups.util.TimeScheduler;

@Experimental
@MBean(description="Implements synchronous acks for messages which have their RSVP flag set)")
public class RSVP
extends Protocol {
    @Property(description="Max time in milliseconds to block for an RSVP'ed message (0 blocks forever).")
    protected long timeout = 10000L;
    @Property(description="Whether an exception should be thrown when the timeout kicks in, and we haven't yet received all acks. An exception would be thrown all the way up to JChannel.send()")
    protected boolean throw_exception_on_timeout = true;
    @Property(description="When true, we pass the message up to the application and only then send an ack. When false, we send an ack first and only then pass the message up to the application.")
    protected boolean ack_on_delivery = true;
    @Property(description="Interval (in milliseconds) at which we resend the RSVP request. Needs to be < timeout. 0 disables it.")
    protected long resend_interval = 2000L;
    protected short current_id = 0;
    protected TimeScheduler timer;
    protected volatile List<Address> members = new ArrayList<Address>();
    protected Address local_addr;
    protected final Map<Short, Entry> ids = new HashMap<Short, Entry>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ManagedAttribute(description="Number of pending RSVP requests")
    public int getPendingRsvpRequests() {
        Map<Short, Entry> map = this.ids;
        synchronized (map) {
            return this.ids.size();
        }
    }

    @Override
    public void init() throws Exception {
        super.init();
        this.timer = this.getTransport().getTimer();
        if (this.timeout > 0L && this.resend_interval > 0L && this.resend_interval >= this.timeout) {
            this.log.warn("resend_interval (" + this.resend_interval + ") is >= timeout (" + this.timeout + "); setting " + "resend_interval to timeout / 3");
            this.resend_interval = this.timeout / 3L;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void destroy() {
        Map<Short, Entry> map = this.ids;
        synchronized (map) {
            for (Entry entry : this.ids.values()) {
                entry.destroy();
            }
            this.ids.clear();
        }
        super.destroy();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 1: {
                Message msg = (Message)evt.getArg();
                if (!msg.isFlagSet(Message.Flag.RSVP)) break;
                short next_id = this.getNextId();
                RsvpHeader hdr = new RsvpHeader(1, next_id);
                msg.putHeader(this.id, hdr);
                Address target = msg.getDest();
                Entry entry = target != null ? new Entry(target) : new Entry(this.members);
                Object retval = null;
                try {
                    Map<Short, Entry> map = this.ids;
                    synchronized (map) {
                        this.ids.put(next_id, entry);
                    }
                    entry.startTask(next_id);
                    if (this.log.isTraceEnabled()) {
                        this.log.trace(this.local_addr + ": " + hdr.typeToString() + " --> " + target);
                    }
                    retval = this.down_prot.down(evt);
                    entry.block(this.timeout);
                }
                catch (TimeoutException e) {
                    if (this.throw_exception_on_timeout) {
                        throw e;
                    }
                    if (this.log.isWarnEnabled()) {
                        this.log.warn("message ran into a timeout, missing acks: " + entry);
                    }
                }
                finally {
                    Map<Short, Entry> e = this.ids;
                    synchronized (e) {
                        Entry tmp = this.ids.remove(next_id);
                        if (tmp != null) {
                            tmp.destroy();
                        }
                    }
                }
                return retval;
            }
            case 6: {
                View view = (View)evt.getArg();
                this.members = view.getMembers();
                Map<Short, Entry> map = this.ids;
                synchronized (map) {
                    Iterator<Map.Entry<Short, Entry>> it = this.ids.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry entry = it.next().getValue();
                        if (entry == null || !entry.retainAll(view.getMembers()) || entry.size() != 0) continue;
                        entry.destroy();
                        it.remove();
                    }
                    break;
                }
            }
            case 8: {
                this.local_addr = (Address)evt.getArg();
            }
        }
        return this.down_prot.down(evt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Object up(Event evt) {
        switch (evt.getType()) {
            case 1: {
                Message msg = (Message)evt.getArg();
                if (!msg.isFlagSet(Message.Flag.RSVP)) break;
                RsvpHeader hdr = (RsvpHeader)msg.getHeader(this.id);
                if (hdr == null) {
                    this.log.error("message with RSVP flag needs to have an RsvpHeader");
                    break;
                }
                Address sender = msg.getSrc();
                if (this.log.isTraceEnabled()) {
                    this.log.trace(this.local_addr + ": " + hdr.typeToString() + " <-- " + sender);
                }
                switch (hdr.type) {
                    case 1: {
                        if (this.ack_on_delivery) {
                            try {
                                Object object = this.up_prot.up(evt);
                                return object;
                            }
                            finally {
                                this.sendResponse(sender, hdr.id);
                            }
                        }
                        this.sendResponse(sender, hdr.id);
                        return this.up_prot.up(evt);
                    }
                    case 2: {
                        return null;
                    }
                    case 3: {
                        this.handleResponse(msg.getSrc(), hdr.id);
                        return null;
                    }
                }
            }
        }
        return this.up_prot.up(evt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleResponse(Address member, short id) {
        Map<Short, Entry> map = this.ids;
        synchronized (map) {
            Entry entry = this.ids.get(id);
            if (entry != null) {
                entry.ack(member);
                if (entry.size() == 0) {
                    entry.destroy();
                    this.ids.remove(id);
                }
            }
        }
    }

    protected void sendResponse(Address dest, short id) {
        try {
            Message msg = new Message(dest);
            msg.setFlag(Message.Flag.RSVP, Message.Flag.OOB);
            RsvpHeader hdr = new RsvpHeader(3, id);
            msg.putHeader(this.id, hdr);
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": " + hdr.typeToString() + " --> " + dest);
            }
            this.down_prot.down(new Event(1, msg));
        }
        catch (Throwable t) {
            this.log.error("failed sending response", t);
        }
    }

    protected synchronized short getNextId() {
        short s = this.current_id;
        this.current_id = (short)(s + 1);
        return s;
    }

    protected static class RsvpHeader
    extends Header {
        protected static final byte REQ = 1;
        protected static final byte REQ_ONLY = 2;
        protected static final byte RSP = 3;
        protected byte type;
        protected short id;

        public RsvpHeader() {
        }

        public RsvpHeader(byte type, short id) {
            this.type = type;
            this.id = id;
        }

        @Override
        public int size() {
            return 3;
        }

        @Override
        public void writeTo(DataOutput out) throws Exception {
            out.writeByte(this.type);
            out.writeShort(this.id);
        }

        @Override
        public void readFrom(DataInput in) throws Exception {
            this.type = in.readByte();
            this.id = in.readShort();
        }

        @Override
        public String toString() {
            String tmp = this.typeToString();
            return tmp + "(" + this.id + ")";
        }

        protected String typeToString() {
            switch (this.type) {
                case 1: {
                    return "REQ";
                }
                case 2: {
                    return "REQ-ONLY";
                }
                case 3: {
                    return "RSP";
                }
            }
            return "unknown";
        }
    }

    protected class Entry {
        protected final AckCollector ack_collector;
        protected final Address target;
        protected Future<?> resend_task;

        protected Entry(Address member) {
            this.target = member;
            this.ack_collector = new AckCollector(member);
        }

        protected Entry(Collection<Address> members) {
            this.target = null;
            this.ack_collector = new AckCollector(members);
        }

        protected void startTask(final short rsvp_id) {
            if (this.resend_task != null && !this.resend_task.isDone()) {
                this.resend_task.cancel(false);
            }
            this.resend_task = RSVP.this.timer.scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    if (Entry.this.ack_collector.size() == 0) {
                        Entry.this.cancelTask();
                        return;
                    }
                    Message msg = new Message(Entry.this.target);
                    msg.setFlag(Message.Flag.RSVP);
                    RsvpHeader hdr = new RsvpHeader(2, rsvp_id);
                    msg.putHeader(RSVP.this.id, hdr);
                    if (RSVP.this.log.isTraceEnabled()) {
                        RSVP.this.log.trace(RSVP.this.local_addr + ": " + hdr.typeToString() + " --> " + Entry.this.target);
                    }
                    RSVP.this.down_prot.down(new Event(1, msg));
                }
            }, RSVP.this.resend_interval, RSVP.this.resend_interval, TimeUnit.MILLISECONDS);
        }

        protected void cancelTask() {
            if (this.resend_task != null) {
                this.resend_task.cancel(false);
            }
            this.ack_collector.destroy();
        }

        protected void ack(Address member) {
            this.ack_collector.ack(member);
        }

        protected boolean retainAll(Collection<Address> members) {
            return this.ack_collector.retainAll(members);
        }

        protected int size() {
            return this.ack_collector.size();
        }

        protected void block(long timeout) throws TimeoutException {
            this.ack_collector.waitForAllAcks(timeout);
        }

        protected void destroy() {
            this.cancelTask();
        }

        public String toString() {
            return this.ack_collector.toString();
        }
    }
}

