/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.jms.server.endpoint;

import javax.jms.IllegalStateException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.selector.Selector;
import org.jboss.jms.server.endpoint.ConsumerEndpoint;
import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
import org.jboss.jms.server.messagecounter.MessageCounter;
import org.jboss.jms.util.ExceptionUtil;
import org.jboss.jms.wireformat.ClientDelivery;
import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Channel;
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.Receiver;
import org.jboss.messaging.core.SimpleDelivery;
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.message.MessageReference;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.remoting.Client;
import org.jboss.remoting.callback.Callback;
import org.jboss.remoting.callback.HandleCallbackException;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;

public class ServerConsumerEndpoint
implements Receiver,
ConsumerEndpoint {
    private static final Logger log = Logger.getLogger(ServerConsumerEndpoint.class);
    private boolean trace = log.isTraceEnabled();
    private int id;
    private Channel messageQueue;
    private String queueName;
    private ServerSessionEndpoint sessionEndpoint;
    private ServerInvokerCallbackHandler callbackHandler;
    private boolean noLocal;
    private Selector messageSelector;
    private JBossDestination destination;
    private Queue dlq;
    private Queue expiryQueue;
    private long redeliveryDelay;
    private boolean started;
    private Object startStopLock;
    private volatile boolean clientAccepting;
    private boolean storeDeliveries;
    private long lastDeliveryID = -1L;

    ServerConsumerEndpoint(int id, Channel messageQueue, String queueName, ServerSessionEndpoint sessionEndpoint, String selector, boolean noLocal, JBossDestination dest, Queue dlq, Queue expiryQueue, long redeliveryDelay) throws InvalidSelectorException {
        if (this.trace) {
            log.trace("constructing consumer endpoint " + id);
        }
        this.id = id;
        this.messageQueue = messageQueue;
        this.queueName = queueName;
        this.sessionEndpoint = sessionEndpoint;
        this.callbackHandler = sessionEndpoint.getConnectionEndpoint().getCallbackHandler();
        this.noLocal = noLocal;
        this.destination = dest;
        this.dlq = dlq;
        this.redeliveryDelay = redeliveryDelay;
        this.expiryQueue = expiryQueue;
        this.clientAccepting = false;
        this.startStopLock = new Object();
        this.storeDeliveries = !dest.isTopic() || messageQueue.isRecoverable();
        this.storeDeliveries = true;
        if (selector != null) {
            if (this.trace) {
                log.trace("creating selector:" + selector);
            }
            this.messageSelector = new Selector(selector);
            if (this.trace) {
                log.trace("created selector");
            }
        }
        this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
        this.messageQueue.add(this);
        log.debug(this + " constructed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx) {
        if (this.trace) {
            log.trace(this + " receives " + ref + " for delivery");
        }
        if (!this.clientAccepting) {
            if (this.trace) {
                log.trace(this + "'s client is NOT accepting messages!");
            }
            return null;
        }
        if (ref.getMessage().isExpired()) {
            SimpleDelivery delivery = new SimpleDelivery(observer, ref, true);
            try {
                this.sessionEndpoint.expireDelivery(delivery, this.expiryQueue);
            }
            catch (Throwable t) {
                log.error("Failed to expire delivery: " + delivery, t);
            }
            return delivery;
        }
        Object object = this.startStopLock;
        synchronized (object) {
            Message message;
            if (!this.started) {
                if (this.trace) {
                    log.trace(this + " NOT started yet!");
                }
                return null;
            }
            if (this.trace) {
                log.trace(this + " has startStopLock lock, preparing the message for delivery");
            }
            boolean selectorRejected = !this.accept(message = ref.getMessage());
            SimpleDelivery delivery = new SimpleDelivery(observer, ref, !this.storeDeliveries, !selectorRejected);
            if (selectorRejected) {
                return delivery;
            }
            long deliveryId = this.storeDeliveries ? this.sessionEndpoint.addDelivery(delivery, this.id, this.dlq, this.expiryQueue, this.redeliveryDelay) : -1L;
            Client callbackClient = this.callbackHandler.getCallbackClient();
            ClientDelivery del = new ClientDelivery(message, this.id, deliveryId, ref.getDeliveryCount());
            Callback callback = new Callback(del);
            try {
                Object invoker = null;
                invoker = callbackClient != null ? callbackClient.getInvoker() : new Object();
                Object object2 = invoker;
                synchronized (object2) {
                    if (this.trace) {
                        log.trace(this + " submitting message " + message + " to the remoting layer to be sent asynchronously");
                    }
                    this.callbackHandler.handleCallbackOneway(callback);
                    this.lastDeliveryID = deliveryId;
                }
            }
            catch (HandleCallbackException e) {
                log.debug(this + " failed to handle callback", e);
                return null;
            }
            return delivery;
        }
    }

    public boolean accept(Message msg) {
        boolean accept = true;
        if (this.destination.isQueue() && this.messageSelector != null) {
            accept = this.messageSelector.accept(msg);
            if (this.trace) {
                log.trace("message selector " + (accept ? "accepts " : "DOES NOT accept ") + "the message");
            }
        }
        if (accept && this.noLocal) {
            int conId = ((JBossMessage)msg).getConnectionID();
            if (this.trace) {
                log.trace("message connection id: " + conId + " current connection connection id: " + this.sessionEndpoint.getConnectionEndpoint().getConnectionID());
            }
            boolean bl = accept = conId != this.sessionEndpoint.getConnectionEndpoint().getConnectionID();
            if (this.trace) {
                log.trace("accepting? " + accept);
            }
        }
        return accept;
    }

    public long closing() throws JMSException {
        try {
            if (this.trace) {
                log.trace(this + " closing");
            }
            this.stop();
            return this.lastDeliveryID;
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " closing");
        }
    }

    public void close() throws JMSException {
        try {
            if (this.trace) {
                log.trace(this + " close");
            }
            this.localClose();
            this.sessionEndpoint.removeConsumer(this.id);
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " close");
        }
    }

    public void changeRate(float newRate) throws JMSException {
        if (this.trace) {
            log.trace(this + " changing rate to " + newRate);
        }
        try {
            this.clientAccepting = newRate > 0.0f;
            if (this.clientAccepting) {
                this.promptDelivery();
            }
        }
        catch (Throwable t) {
            throw ExceptionUtil.handleJMSInvocation(t, this + " changeRate");
        }
    }

    public String toString() {
        return "ConsumerEndpoint[" + this.id + "]";
    }

    public JBossDestination getDestination() {
        return this.destination;
    }

    public ServerSessionEndpoint getSessionEndpoint() {
        return this.sessionEndpoint;
    }

    Queue getDLQ() {
        return this.dlq;
    }

    Queue getExpiryQueue() {
        return this.expiryQueue;
    }

    long getRedliveryDelay() {
        return this.redeliveryDelay;
    }

    void localClose() throws Throwable {
        PostOffice postOffice;
        Binding binding;
        if (this.trace) {
            log.trace(this + " grabbed the main lock in close() " + this);
        }
        this.messageQueue.remove(this);
        Dispatcher.instance.unregisterTarget(this.id, (Object)this);
        if (this.destination.isTopic() && (binding = (postOffice = this.sessionEndpoint.getConnectionEndpoint().getServerPeer().getPostOfficeInstance()).getBindingForQueueName(this.queueName)) != null && !binding.getQueue().isRecoverable()) {
            Queue queue = binding.getQueue();
            if (!queue.isClustered()) {
                postOffice.unbindQueue(queue.getName());
            } else {
                ((ClusteredPostOffice)postOffice).unbindClusteredQueue(queue.getName());
            }
            String counterName = "Subscription." + this.queueName;
            MessageCounter counter = this.sessionEndpoint.getConnectionEndpoint().getServerPeer().getMessageCounterManager().unregisterMessageCounter(counterName);
            if (counter == null) {
                throw new IllegalStateException("Cannot find counter to remove " + counterName);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void start() {
        Object object = this.startStopLock;
        synchronized (object) {
            if (this.started) {
                return;
            }
            this.started = true;
        }
        this.promptDelivery();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stop() throws Throwable {
        Object object = this.startStopLock;
        synchronized (object) {
            if (!this.started) {
                return;
            }
            this.started = false;
        }
    }

    private void promptDelivery() {
        this.sessionEndpoint.promptDelivery(this.messageQueue);
    }
}

