/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.messaging.core.plugin.postoffice.cluster;

import EDU.oswego.cs.dl.util.concurrent.Executor;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.SimpleDelivery;
import org.jboss.messaging.core.local.PagingFilteredQueue;
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.message.MessageReference;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredQueue;
import org.jboss.messaging.core.plugin.postoffice.cluster.PostOfficeInternal;
import org.jboss.messaging.core.plugin.postoffice.cluster.PullMessageResultRequest;
import org.jboss.messaging.core.plugin.postoffice.cluster.PullMessagesRequest;
import org.jboss.messaging.core.plugin.postoffice.cluster.QueueStats;
import org.jboss.messaging.core.plugin.postoffice.cluster.RemoteQueueStub;
import org.jboss.messaging.core.plugin.postoffice.cluster.TransactionId;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.util.Future;

public class LocalClusteredQueue
extends PagingFilteredQueue
implements ClusteredQueue {
    private static final Logger log = Logger.getLogger(LocalClusteredQueue.class);
    private boolean trace = log.isTraceEnabled();
    private PostOfficeInternal office;
    private volatile int lastCount;
    private volatile RemoteQueueStub pullQueue;
    private int nodeId;
    private TransactionRepository tr;
    private Executor executor;

    public LocalClusteredQueue(ClusteredPostOffice office, int nodeId, String name, long id, MessageStore ms, PersistenceManager pm, boolean acceptReliableMessages, boolean recoverable, int maxSize, Filter filter, TransactionRepository tr, int fullSize, int pageSize, int downCacheSize) {
        super(name, id, ms, pm, acceptReliableMessages, recoverable, maxSize, filter, fullSize, pageSize, downCacheSize);
        this.nodeId = nodeId;
        this.tr = tr;
        this.office = (PostOfficeInternal)office;
        this.executor = this.office.getPooledExecutor();
    }

    public LocalClusteredQueue(ClusteredPostOffice office, int nodeId, String name, long id, MessageStore ms, PersistenceManager pm, boolean acceptReliableMessages, boolean recoverable, int maxSize, Filter filter, TransactionRepository tr) {
        super(name, id, ms, pm, acceptReliableMessages, recoverable, maxSize, filter);
        this.nodeId = nodeId;
        this.tr = tr;
        this.office = (PostOfficeInternal)office;
        this.executor = this.office.getPooledExecutor();
    }

    public void setPullQueue(RemoteQueueStub queue) {
        this.pullQueue = queue;
    }

    public RemoteQueueStub getPullQueue() {
        return this.pullQueue;
    }

    public QueueStats getStats() {
        int cnt = this.getRefCount();
        if (cnt != this.lastCount) {
            this.lastCount = cnt;
            return new QueueStats(this.name, cnt);
        }
        return null;
    }

    public boolean isLocal() {
        return true;
    }

    public int getNodeId() {
        return this.nodeId;
    }

    public String toString() {
        return "LocalClusteredQueue[" + this.getChannelID() + "/" + this.getName() + "]";
    }

    public Delivery handleFromCluster(MessageReference ref) throws Exception {
        if (this.trace) {
            log.trace(this + " handling " + ref + " from cluster");
        }
        if (this.filter != null && !this.filter.accept(ref.getMessage())) {
            SimpleDelivery del = new SimpleDelivery(this, ref, true, false);
            if (this.trace) {
                log.trace(this + " " + ref + " rejected by filter");
            }
            return del;
        }
        this.checkClosed();
        return this.handleInternal(null, ref, null, false);
    }

    public void acknowledgeFromCluster(Delivery d) throws Throwable {
        this.acknowledgeInternal(d, null, false);
    }

    public void handlePullMessagesResult(RemoteQueueStub remoteQueue, Message message, long holdingTxId, boolean failBeforeCommit, boolean failAfterCommit) throws Exception {
        MessagePullResultRunnable runnable = new MessagePullResultRunnable(remoteQueue, message, holdingTxId, failBeforeCommit, failAfterCommit);
        this.executor.execute((Runnable)runnable);
    }

    public void handleGetDeliveriesRequest(int returnNodeId, int number, TransactionId txId, PullMessagesRequest tx) throws Exception {
        MessagePullRequestRunnable runnable = new MessagePullRequestRunnable(returnNodeId, number, txId, tx);
        this.executor.execute((Runnable)runnable);
    }

    public boolean isClustered() {
        return true;
    }

    public int getRefCount() {
        Future result = new Future();
        try {
            this.executor.execute((Runnable)new GetRefCountRunnable(result));
        }
        catch (InterruptedException e) {
            log.warn("Thread interrupted", e);
        }
        return (Integer)result.getResult();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void mergeIn(RemoteQueueStub remoteQueue) throws Exception {
        if (this.trace) {
            log.trace("Merging queue " + remoteQueue + " into " + this);
        }
        Object object = this.refLock;
        synchronized (object) {
            this.flushDownCache();
            PersistenceManager.InitialLoadInfo ili = this.pm.mergeAndLoad(remoteQueue.getChannelID(), this.channelID, this.fullSize - this.messageRefs.size(), this.firstPagingOrder, this.nextPagingOrder);
            if (this.trace) {
                log.trace("Loaded " + ili.getRefInfos().size() + " refs");
            }
            this.doLoad(ili);
            this.deliverInternal();
        }
    }

    protected void deliverInternal() {
        super.deliverInternal();
        if (this.receiversReady && this.pullQueue != null) {
            try {
                this.sendPullMessage();
            }
            catch (Exception e) {
                log.error("Failed to send pull message", e);
            }
        }
    }

    private void sendPullMessage() throws Exception {
        if (this.pullQueue == null) {
            return;
        }
        RemoteQueueStub theQueue = this.pullQueue;
        if (theQueue == null) {
            return;
        }
        this.executor.execute((Runnable)new SendPullRequestRunnable(theQueue));
    }

    private class MessagePullResultRunnable
    implements Runnable {
        private RemoteQueueStub remoteQueue;
        private Message message;
        private long holdingTxId;
        private boolean failBeforeCommit;
        private boolean failAfterCommit;

        private MessagePullResultRunnable(RemoteQueueStub remoteQueue, Message message, long holdingTxId, boolean failBeforeCommit, boolean failAfterCommit) {
            this.remoteQueue = remoteQueue;
            this.message = message;
            this.holdingTxId = holdingTxId;
            this.failBeforeCommit = failBeforeCommit;
            this.failAfterCommit = failAfterCommit;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            try {
                boolean handleTransactionally;
                Transaction tx = null;
                boolean bl = handleTransactionally = this.message.isReliable() && LocalClusteredQueue.this.isRecoverable();
                if (handleTransactionally) {
                    tx = LocalClusteredQueue.this.tr.createTransaction();
                    this.message.setPersisted(true);
                }
                MessageReference ref = null;
                try {
                    ref = LocalClusteredQueue.this.ms.reference(this.message);
                    Delivery delRet = LocalClusteredQueue.this.handleInternal(null, ref, tx, true);
                    if (delRet == null || !delRet.isSelectorAccepted()) {
                        throw new IllegalStateException("Queue did not accept reference!");
                    }
                }
                finally {
                    if (ref != null) {
                        ref.releaseMemoryReference();
                    }
                }
                SimpleDelivery del = new SimpleDelivery(this.remoteQueue, ref);
                del.acknowledge(tx);
                if (this.failBeforeCommit) {
                    throw new Exception("Test failure before commit");
                }
                if (handleTransactionally) {
                    tx.commit();
                }
                if (this.failAfterCommit) {
                    throw new Exception("Test failure after commit");
                }
                if (handleTransactionally) {
                    PullMessagesRequest req = new PullMessagesRequest(LocalClusteredQueue.this.nodeId, this.holdingTxId);
                    LocalClusteredQueue.this.office.asyncSendRequest(req, this.remoteQueue.getNodeId());
                }
            }
            catch (Throwable e) {
                log.error("Failed to handle pulled message", e);
            }
        }
    }

    private class MessagePullRequestRunnable
    implements Runnable {
        int returnNodeId;
        int number;
        TransactionId txId;
        PullMessagesRequest tx;

        public MessagePullRequestRunnable(int returnNodeId, int number, TransactionId txId, PullMessagesRequest tx) {
            this.returnNodeId = returnNodeId;
            this.number = number;
            this.txId = txId;
            this.tx = tx;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            try {
                Delivery del = null;
                if (!LocalClusteredQueue.this.receiversReady) {
                    Object object = LocalClusteredQueue.this.refLock;
                    synchronized (object) {
                        MessageReference ref = LocalClusteredQueue.this.removeFirstInMemory();
                        if (ref != null) {
                            del = new SimpleDelivery(LocalClusteredQueue.this, ref);
                            LocalClusteredQueue.this.deliveringCount.increment();
                        }
                    }
                }
                if (LocalClusteredQueue.this.trace) {
                    log.trace("PullMessagesRunnable got " + del);
                }
                if (del != null) {
                    PullMessageResultRequest response = new PullMessageResultRequest(LocalClusteredQueue.this.nodeId, this.txId.getTxId(), LocalClusteredQueue.this.name, del.getReference().getMessage());
                    if (!del.getReference().getMessage().isReliable() || !LocalClusteredQueue.this.recoverable) {
                        del.acknowledge(null);
                    } else {
                        this.tx.setReliableDelivery(del);
                        LocalClusteredQueue.this.office.holdTransaction(this.txId, this.tx);
                    }
                    LocalClusteredQueue.this.office.asyncSendRequest(response, this.returnNodeId);
                }
            }
            catch (Throwable e) {
                log.error("Failed to get deliveries", e);
            }
        }
    }

    private class SendPullRequestRunnable
    implements Runnable {
        private RemoteQueueStub theQueue;

        private SendPullRequestRunnable(RemoteQueueStub theQueue) {
            this.theQueue = theQueue;
        }

        public void run() {
            try {
                Transaction tx = LocalClusteredQueue.this.tr.createTransaction();
                PullMessagesRequest req = new PullMessagesRequest(LocalClusteredQueue.this.nodeId, tx.getId(), this.theQueue.getChannelID(), LocalClusteredQueue.this.name, 1);
                LocalClusteredQueue.this.office.asyncSendRequest(req, this.theQueue.getNodeId());
            }
            catch (Exception e) {
                log.error("Failed to pull message", e);
            }
        }
    }

    private class GetRefCountRunnable
    implements Runnable {
        Future result;

        public GetRefCountRunnable(Future result) {
            this.result = result;
        }

        public void run() {
            int refCount = LocalClusteredQueue.this.messageRefs.size();
            this.result.setResult(new Integer(refCount));
        }
    }
}

