/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.messaging.core;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import org.jboss.jms.server.MessagingTimeoutFactory;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Channel;
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.Receiver;
import org.jboss.messaging.core.Router;
import org.jboss.messaging.core.SimpleDelivery;
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.message.MessageReference;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionException;
import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
import org.jboss.util.timeout.Timeout;
import org.jboss.util.timeout.TimeoutTarget;

public abstract class ChannelSupport
implements Channel {
    private static final Logger log = Logger.getLogger(ChannelSupport.class);
    private boolean trace = log.isTraceEnabled();
    protected long channelID;
    protected Router router;
    protected MessageStore ms;
    protected boolean receiversReady;
    protected PriorityLinkedList messageRefs;
    protected boolean acceptReliableMessages;
    protected boolean recoverable;
    protected PersistenceManager pm;
    protected Object refLock;
    protected boolean active = true;
    protected SynchronizedInt deliveringCount;
    protected Set scheduledDeliveries;
    protected int maxSize;
    protected SynchronizedInt messagesAdded;

    protected ChannelSupport(long channelID, MessageStore ms, PersistenceManager pm, boolean acceptReliableMessages, boolean recoverable, int maxSize) {
        if (this.trace) {
            log.trace("creating " + (pm != null ? "recoverable " : "non-recoverable ") + "channel[" + channelID + "]");
        }
        if (ms == null) {
            throw new IllegalArgumentException("ChannelSupport requires a non-null message store");
        }
        if (pm == null) {
            throw new IllegalArgumentException("ChannelSupport requires a non-null persistence manager");
        }
        this.ms = ms;
        this.pm = pm;
        this.channelID = channelID;
        this.acceptReliableMessages = acceptReliableMessages;
        this.recoverable = recoverable;
        this.messageRefs = new BasicPriorityLinkedList(10);
        this.refLock = new Object();
        this.deliveringCount = new SynchronizedInt(0);
        this.scheduledDeliveries = new HashSet();
        this.maxSize = maxSize;
        this.messagesAdded = new SynchronizedInt(0);
    }

    public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx) {
        if (!this.active) {
            return null;
        }
        this.checkClosed();
        return this.handleInternal(sender, ref, tx, true);
    }

    public void acknowledge(Delivery d, Transaction tx) throws Throwable {
        if (this.trace) {
            log.trace("acknowledging " + d + (tx == null ? " non-transactionally" : " transactionally in " + tx));
        }
        this.acknowledgeInternal(d, tx, true);
    }

    public void cancel(Delivery del) throws Throwable {
        MessageReference ref = del.getReference();
        if (ref.getMessage().isReliable()) {
            this.pm.updateDeliveryCount(this.channelID, ref);
        }
        this.deliveringCount.decrement();
        if (!this.checkAndSchedule(ref)) {
            this.cancelInternal(ref);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean add(Receiver r) {
        if (this.trace) {
            log.trace(this + " attempting to add receiver " + r);
        }
        boolean added = this.router.add(r);
        if (this.trace) {
            log.trace("receiver " + r + (added ? "" : " NOT") + " added");
        }
        Object object = this.refLock;
        synchronized (object) {
            this.receiversReady = true;
        }
        return added;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean remove(Receiver r) {
        boolean removed = this.router.remove(r);
        if (removed && !this.router.iterator().hasNext()) {
            Object object = this.refLock;
            synchronized (object) {
                this.receiversReady = false;
            }
        }
        if (this.trace) {
            log.trace(this + (removed ? " removed " : " did NOT remove ") + r);
        }
        return removed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clear() {
        this.router.clear();
        Object object = this.refLock;
        synchronized (object) {
            this.receiversReady = false;
        }
    }

    public boolean contains(Receiver r) {
        return this.router.contains(r);
    }

    public Iterator iterator() {
        return this.router.iterator();
    }

    public int getNumberOfReceivers() {
        return this.router.getNumberOfReceivers();
    }

    public long getChannelID() {
        return this.channelID;
    }

    public boolean isRecoverable() {
        return this.recoverable;
    }

    public boolean acceptReliableMessages() {
        return this.acceptReliableMessages;
    }

    public List browse() {
        return this.browse(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List browse(Filter filter) {
        if (this.trace) {
            log.trace(this + " browse" + (filter == null ? "" : ", filter = " + filter));
        }
        Object object = this.refLock;
        synchronized (object) {
            List references = this.undelivered(filter);
            ArrayList<Message> messages = new ArrayList<Message>(references.size());
            Iterator i = references.iterator();
            while (i.hasNext()) {
                MessageReference ref = (MessageReference)i.next();
                messages.add(ref.getMessage());
            }
            return messages;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deliver() {
        this.checkClosed();
        if (this.router.getNumberOfReceivers() > 0) {
            Object object = this.refLock;
            synchronized (object) {
                this.receiversReady = true;
                this.deliverInternal();
            }
        }
    }

    public void close() {
        if (this.router != null) {
            this.router.clear();
            this.router = null;
        }
        this.clearAllScheduledDeliveries();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeAllReferences() throws Throwable {
        log.debug(this + " remnoving all references");
        Object object = this.refLock;
        synchronized (object) {
            MessageReference ref;
            while ((ref = this.removeFirstInMemory()) != null) {
                SimpleDelivery del = new SimpleDelivery(this, ref);
                del.acknowledge(null);
            }
            this.deliveringCount.set(0);
        }
        this.clearAllScheduledDeliveries();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List undelivered(Filter filter) {
        ArrayList<MessageReference> undelivered = new ArrayList<MessageReference>();
        Object object = this.refLock;
        synchronized (object) {
            Iterator iter = this.messageRefs.getAll().iterator();
            while (iter.hasNext()) {
                MessageReference r = (MessageReference)iter.next();
                if (filter == null || filter.accept(r.getMessage())) {
                    undelivered.add(r);
                    continue;
                }
                if (!this.trace) continue;
                log.trace(this + ": " + r + " NOT accepted by filter so won't add to list");
            }
        }
        if (this.trace) {
            log.trace(this + ": undelivered() returns a list of " + undelivered.size() + " undelivered memory messages");
        }
        return undelivered;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getMessageCount() {
        Object object = this.refLock;
        synchronized (object) {
            return this.messageRefs.size() + this.getDeliveringCount() + this.getScheduledCount();
        }
    }

    public int getDeliveringCount() {
        return this.deliveringCount.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getScheduledCount() {
        Set set = this.scheduledDeliveries;
        synchronized (set) {
            return this.scheduledDeliveries.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void activate() {
        Object object = this.refLock;
        synchronized (object) {
            this.active = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deactivate() {
        Object object = this.refLock;
        synchronized (object) {
            this.active = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isActive() {
        Object object = this.refLock;
        synchronized (object) {
            return this.active;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List recoverDeliveries(List messageIds) {
        Iterator iter = messageIds.iterator();
        ArrayList<SimpleDelivery> dels = new ArrayList<SimpleDelivery>();
        Object object = this.refLock;
        synchronized (object) {
            ListIterator liter = this.messageRefs.iterator();
            block3: while (iter.hasNext()) {
                MessageReference ref;
                Long id = (Long)iter.next();
                do {
                    if (liter.hasNext()) continue;
                    log.warn(this + " cannot find reference " + id + " (Might be paged!)");
                    continue block3;
                } while ((ref = (MessageReference)liter.next()).getMessage().getMessageID() != id.longValue());
                liter.remove();
                SimpleDelivery del = new SimpleDelivery(this, ref);
                dels.add(del);
            }
        }
        return dels;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getMaxSize() {
        Object object = this.refLock;
        synchronized (object) {
            return this.maxSize;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setMaxSize(int newSize) {
        Object object = this.refLock;
        synchronized (object) {
            int count = this.getMessageCount();
            if (newSize != -1 && count > newSize) {
                log.warn("Cannot set maxSize to " + newSize + " since there are already " + count + " refs");
            } else {
                this.maxSize = newSize;
            }
        }
    }

    public int getMessagesAdded() {
        return this.messagesAdded.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int memoryRefCount() {
        Object object = this.refLock;
        synchronized (object) {
            return this.messageRefs.size();
        }
    }

    public String toString() {
        return "ChannelSupport[" + this.channelID + "]";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void clearAllScheduledDeliveries() {
        Set set = this.scheduledDeliveries;
        synchronized (set) {
            HashSet clone = new HashSet(this.scheduledDeliveries);
            Iterator iter = clone.iterator();
            while (iter.hasNext()) {
                Timeout timeout = (Timeout)iter.next();
                timeout.cancel();
            }
            this.scheduledDeliveries.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void cancelInternal(MessageReference ref) throws Exception {
        if (this.trace) {
            log.trace(this + " cancelling " + ref + " in memory");
        }
        Object object = this.refLock;
        synchronized (object) {
            this.messageRefs.addFirst(ref, ref.getMessage().getPriority());
        }
        if (this.trace) {
            log.trace(this + " added " + ref + " back into state");
        }
    }

    protected void deliverInternal() {
        block11: {
            if (this.trace) {
                log.trace(this + " was prompted delivery");
            }
            try {
                ListIterator iter = null;
                MessageReference ref = null;
                if (!this.receiversReady) {
                    return;
                }
                while ((ref = this.nextReference(iter)) != null) {
                    Delivery del;
                    if (this.trace) {
                        log.trace(this + " pushing " + ref);
                    }
                    boolean bl = this.receiversReady = (del = this.router.handle(this, ref, null)) != null;
                    if (del == null) {
                        if (this.trace) {
                            log.trace(this + " got no delivery for " + ref + " so no receiver got the message. Stopping delivery.");
                        }
                        break block11;
                    }
                    if (!del.isSelectorAccepted()) {
                        if (iter != null) continue;
                        iter = this.messageRefs.iterator();
                        continue;
                    }
                    if (this.trace) {
                        log.trace(this + ": " + del + " returned for message " + ref);
                    }
                    this.removeReference(iter);
                    this.deliveringCount.increment();
                }
                if (this.trace) {
                    log.trace(this + " no more refs to deliver ");
                }
            }
            catch (Throwable t) {
                log.error(this + " Failed to deliver", t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean deliverScheduled(MessageReference ref) {
        try {
            Object object = this.refLock;
            synchronized (object) {
                Delivery del;
                if (this.trace) {
                    log.trace(this + " pushing " + ref);
                }
                boolean bl = this.receiversReady = (del = this.router.handle(this, ref, null)) != null;
                if (del == null) {
                    if (this.trace) {
                        log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete");
                    }
                    return false;
                }
                if (del.isSelectorAccepted()) {
                    if (this.trace) {
                        log.trace(this + ": " + del + " returned for message:" + ref);
                    }
                    this.deliveringCount.increment();
                    return true;
                }
            }
        }
        catch (Throwable t) {
            log.error(this + " Failed to deliver", t);
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Delivery handleInternal(DeliveryObserver sender, MessageReference ref, Transaction tx, boolean persist) {
        if (ref == null) {
            return null;
        }
        if (this.trace) {
            log.trace(this + " handles " + ref + (tx == null ? " non-transactionally" : " in transaction: " + tx));
        }
        if (this.maxSize != -1 && this.getMessageCount() >= this.maxSize) {
            log.warn(this + " has reached maximum size, " + ref + " will be dropped");
            return null;
        }
        ref = ref.copy();
        try {
            if (tx == null) {
                if (ref.getMessage().isReliable() && !this.acceptReliableMessages) {
                    log.error("Cannot handle reliable message " + ref + " because the channel has a non-recoverable state!");
                    return null;
                }
                if (persist && ref.getMessage().isReliable() && this.recoverable) {
                    if (this.trace) {
                        log.trace(this + " adding " + ref + " to database non-transactionally");
                    }
                    this.pm.addReference(this.channelID, ref, null);
                }
                if (!this.checkAndSchedule(ref)) {
                    Object object = this.refLock;
                    synchronized (object) {
                        this.addReferenceInMemory(ref);
                        this.deliverInternal();
                    }
                }
            } else {
                if (this.trace) {
                    log.trace(this + " adding " + ref + " to state " + (tx == null ? "non-transactionally" : "in transaction: " + tx));
                }
                if (ref.getMessage().isReliable() && !this.acceptReliableMessages) {
                    log.warn(this + " cannot handle reliable messages, dooming the transaction");
                    tx.setRollbackOnly();
                } else {
                    this.getCallback(tx).addRef(ref);
                    if (this.trace) {
                        log.trace(this + " added transactionally " + ref + " in memory");
                    }
                }
                if (persist && ref.getMessage().isReliable() && this.recoverable) {
                    if (this.trace) {
                        log.trace(this + " adding " + ref + (tx == null ? " to database non-transactionally" : " in transaction: " + tx));
                    }
                    this.pm.addReference(this.channelID, ref, tx);
                }
            }
            this.messagesAdded.increment();
        }
        catch (Throwable t) {
            log.error("Failed to handle message", t);
            ref.releaseMemoryReference();
            return null;
        }
        return new SimpleDelivery(this, ref, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean checkAndSchedule(MessageReference ref) {
        if (ref.getScheduledDeliveryTime() > System.currentTimeMillis()) {
            if (this.trace) {
                log.trace("Scheduling delivery for " + ref + " to occur at " + ref.getScheduledDeliveryTime());
            }
            Set set = this.scheduledDeliveries;
            synchronized (set) {
                Timeout timeout = MessagingTimeoutFactory.instance.getFactory().schedule(ref.getScheduledDeliveryTime(), new DeliverRefTimeoutTarget(ref));
                this.scheduledDeliveries.add(timeout);
            }
            return true;
        }
        return false;
    }

    protected void acknowledgeInternal(Delivery d, Transaction tx, boolean persist) throws Exception {
        if (tx == null) {
            if (persist && this.recoverable && d.getReference().getMessage().isReliable()) {
                this.pm.removeReference(this.channelID, d.getReference(), null);
            }
            d.getReference().releaseMemoryReference();
            this.deliveringCount.decrement();
        } else {
            this.getCallback(tx).addDelivery(d);
            if (this.trace) {
                log.trace(this + " added " + d + " to memory on transaction " + tx);
            }
            if (this.recoverable && d.getReference().getMessage().isReliable()) {
                this.pm.removeReference(this.channelID, d.getReference(), tx);
            }
        }
    }

    protected InMemoryCallback getCallback(Transaction tx) {
        InMemoryCallback callback = (InMemoryCallback)tx.getCallback(this);
        if (callback == null) {
            callback = new InMemoryCallback();
            tx.addCallback(callback, this);
        }
        return callback;
    }

    protected MessageReference removeFirstInMemory() throws Exception {
        MessageReference result = (MessageReference)this.messageRefs.removeFirst();
        return result;
    }

    protected void addReferenceInMemory(MessageReference ref) throws Exception {
        if (ref.getMessage().isReliable() && !this.acceptReliableMessages) {
            throw new IllegalStateException("Reliable reference " + ref + " cannot be added to non-recoverable state");
        }
        this.messageRefs.addLast(ref, ref.getMessage().getPriority());
        if (this.trace) {
            log.trace(this + " added " + ref + " non-transactionally in memory");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeReference(ListIterator iter) throws Exception {
        Object object = this.refLock;
        synchronized (object) {
            if (iter == null) {
                if (this.trace) {
                    log.trace(this + " removing first ref in memory");
                }
                this.removeFirstInMemory();
            } else {
                if (this.trace) {
                    log.trace(this + " removed current message from iterator");
                }
                iter.remove();
            }
        }
    }

    private MessageReference nextReference(ListIterator iter) throws Throwable {
        MessageReference ref = iter == null ? (MessageReference)this.messageRefs.peekFirst() : (iter.hasNext() ? (MessageReference)iter.next() : null);
        return ref;
    }

    protected void processMessageBeforeStorage(MessageReference reference) {
    }

    protected void checkClosed() {
        if (this.router == null) {
            throw new IllegalStateException(this + " closed");
        }
    }

    private class DeliverRefTimeoutTarget
    implements TimeoutTarget {
        private MessageReference ref;

        public DeliverRefTimeoutTarget(MessageReference ref) {
            this.ref = ref;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void timedOut(Timeout timeout) {
            if (ChannelSupport.this.trace) {
                log.trace("Scheduled delivery timeout " + this.ref);
            }
            Set set = ChannelSupport.this.scheduledDeliveries;
            synchronized (set) {
                boolean removed = ChannelSupport.this.scheduledDeliveries.remove(timeout);
                if (!removed) {
                    throw new IllegalStateException("Failed to remove timeout " + timeout);
                }
            }
            this.ref.setScheduledDeliveryTime(0L);
            boolean delivered = false;
            if (ChannelSupport.this.router.getNumberOfReceivers() > 0) {
                delivered = ChannelSupport.this.deliverScheduled(this.ref);
            }
            if (!delivered) {
                try {
                    ChannelSupport.this.cancelInternal(this.ref);
                }
                catch (Exception e) {
                    log.error("Failed to cancel", e);
                }
            } else if (ChannelSupport.this.trace) {
                log.trace("Delivered scheduled delivery at " + System.currentTimeMillis() + " for " + this.ref);
            }
        }
    }

    private class InMemoryCallback
    implements TxCallback {
        private List refsToAdd = new ArrayList();
        private List deliveriesToRemove = new ArrayList();

        private InMemoryCallback() {
        }

        private void addRef(MessageReference ref) {
            this.refsToAdd.add(ref);
        }

        private void addDelivery(Delivery del) {
            this.deliveriesToRemove.add(del);
        }

        public void beforePrepare() {
        }

        public void beforeCommit(boolean onePhase) {
        }

        public void beforeRollback(boolean onePhase) {
        }

        public void afterPrepare() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void afterCommit(boolean onePhase) throws Exception {
            try {
                Object i = this.refsToAdd.iterator();
                while (i.hasNext()) {
                    MessageReference ref = (MessageReference)i.next();
                    if (ChannelSupport.this.trace) {
                        log.trace(this + ": adding " + ref + " to non-recoverable state");
                    }
                    try {
                        Object object = ChannelSupport.this.refLock;
                        synchronized (object) {
                            ChannelSupport.this.addReferenceInMemory(ref);
                        }
                    }
                    catch (Throwable t) {
                        throw new TransactionException("Failed to add reference", t);
                    }
                }
                i = this.deliveriesToRemove.iterator();
                while (i.hasNext()) {
                    Delivery del = (Delivery)i.next();
                    if (ChannelSupport.this.trace) {
                        log.trace(this + " removing " + del + " after commit");
                    }
                    del.getReference().releaseMemoryReference();
                    ChannelSupport.this.deliveringCount.decrement();
                }
                i = ChannelSupport.this.refLock;
                synchronized (i) {
                    ChannelSupport.this.deliverInternal();
                }
            }
            catch (Throwable t) {
                log.error("failed to commit", t);
                throw new Exception("Failed to commit", t);
            }
        }

        public void afterRollback(boolean onePhase) throws Exception {
            Iterator i = this.refsToAdd.iterator();
            while (i.hasNext()) {
                MessageReference ref = (MessageReference)i.next();
                if (ChannelSupport.this.trace) {
                    log.trace(this + " releasing memory " + ref + " after rollback");
                }
                ref.releaseMemoryReference();
            }
        }

        public String toString() {
            return ChannelSupport.this + ".InMemoryCallback[" + Integer.toHexString(this.hashCode()) + "]";
        }
    }
}

