package jeus.jms.server.manager;

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import jeus.jms.common.message.MessageID;
import jeus.jms.common.util.log.JeusMessage_JMS5;
import jeus.jms.common.util.log.LogUtils;
import jeus.jms.server.AbstractConsumer;
import jeus.jms.server.JMSMessageConsumer;
import jeus.jms.server.comm.JMSBroker;
import jeus.jms.server.manager.DispatchMessageQueue;
import jeus.jms.server.message.ServerMessage;
import jeus.jms.server.message.SubscriptionEvent;
import jeus.jms.server.message.SubscriptionMessage;
import jeus.util.ScheduleTask;
import jeus.util.ScheduledExecutor;
import jeus.util.collection.queue.ProcessableMessage;
import jeus.util.collection.queue.SerialExecutable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:jeus/jms/server/manager/SubscriptionQueue.class */
public class SubscriptionQueue extends DispatchMessageQueue<SubscriptionMessage> {
    private final AcknowledgeQueue ackQueue;
    private final SubscriptionManager manager;
    private final Semaphore semaphore;
    private boolean suspendedByDestination;
    private boolean suspendedByClient;
    private boolean suspendedByFlowControl;
    private final Object suspendLock;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:jeus/jms/server/manager/SubscriptionQueue$AcknowledgeTask.class */
    public class AcknowledgeTask implements Runnable {
        private final MessageID messageID;

        public AcknowledgeTask(MessageID messageID) {
            this.messageID = messageID;
        }

        @Override // java.lang.Runnable
        public void run() {
            SubscriptionQueue.this.acknowledge(this.messageID);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionQueue(SubscriptionManager subscriptionManager) {
        this.suspendLock = new Object();
        this.manager = subscriptionManager;
        this.ackQueue = new AcknowledgeQueue(this.manager, this, subscriptionManager.getMaxPendingLimit(), subscriptionManager.getResumeDispatchFactor());
        this.semaphore = new Semaphore(getMaxAckQueueSize(), false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionQueue(SubscriptionManager subscriptionManager, boolean z, String str, String str2) {
        super(z, str, str2);
        this.suspendLock = new Object();
        this.manager = subscriptionManager;
        this.ackQueue = new AcknowledgeQueue(this.manager, this, subscriptionManager.getMaxPendingLimit(), subscriptionManager.getResumeDispatchFactor());
        this.semaphore = new Semaphore(getMaxAckQueueSize(), false);
    }

    private int getMaxAckQueueSize() {
        return this.manager.getMaxPendingLimit();
    }

    public int getAckQueueSize() {
        return this.ackQueue.size();
    }

    @Override // jeus.jms.server.manager.DispatchMessageQueue
    public void consumerClosed(long j) {
        super.consumerClosed(j);
        try {
            if (j > 0) {
                if (LogUtils.isLoggable(logger, JeusMessage_JMS5._6200_LEVEL)) {
                    LogUtils.log(logger, JeusMessage_JMS5._6200_LEVEL, JeusMessage_JMS5._6200, Long.valueOf(j));
                }
                this.semaphore.tryAcquire(getMaxAckQueueSize(), j, TimeUnit.MILLISECONDS);
            } else if (j == 0) {
                if (LogUtils.isLoggable(logger, JeusMessage_JMS5._6201_LEVEL)) {
                    LogUtils.log(logger, JeusMessage_JMS5._6201_LEVEL, JeusMessage_JMS5._6201);
                }
                this.semaphore.acquire(getMaxAckQueueSize());
            }
        } catch (InterruptedException e) {
            if (LogUtils.isLoggable(logger, JeusMessage_JMS5._6202_LEVEL)) {
                LogUtils.log(logger, JeusMessage_JMS5._6202_LEVEL, JeusMessage_JMS5._6202, (Throwable) e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueAcknowledge(SubscriptionMessage subscriptionMessage) {
        if (LogUtils.isLoggable(logger, JeusMessage_JMS5._6225_LEVEL)) {
            LogUtils.log(logger, JeusMessage_JMS5._6225_LEVEL, JeusMessage_JMS5._6225, subscriptionMessage.getMessageID());
        }
        try {
            this.semaphore.acquire();
            if (this.ackQueue.put(subscriptionMessage.getMessageID(), subscriptionMessage) != null) {
                this.semaphore.release();
            }
        } catch (InterruptedException e) {
        }
    }

    public SubscriptionMessage findAcknowledge(MessageID messageID) {
        if (LogUtils.isLoggable(logger, JeusMessage_JMS5._6227_LEVEL)) {
            LogUtils.log(logger, JeusMessage_JMS5._6227_LEVEL, JeusMessage_JMS5._6227, messageID);
        }
        return this.ackQueue.get(messageID);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionMessage removeAcknowledge(MessageID messageID) {
        if (LogUtils.isLoggable(logger, JeusMessage_JMS5._6228_LEVEL)) {
            LogUtils.log(logger, JeusMessage_JMS5._6228_LEVEL, JeusMessage_JMS5._6228, messageID);
        }
        SubscriptionMessage remove = this.ackQueue.remove((Object) messageID);
        if (remove != null) {
            this.semaphore.release();
        }
        return remove;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void returnUnackedMessages() {
        LinkedList linkedList = new LinkedList(this.ackQueue.values());
        clearAcknowledges();
        Collections.sort(linkedList);
        Collections.reverse(linkedList);
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            recoverMessage((SubscriptionQueue) it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearAcknowledges() {
        this.ackQueue.clear();
        this.semaphore.release(getMaxAckQueueSize() - this.semaphore.availablePermits());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispatchMessage(SubscriptionMessage subscriptionMessage, DispatchMessageQueue.EnqueueReason enqueueReason) {
        SyncMessageRequest<SubscriptionMessage> acquireSyncRequest = acquireSyncRequest(subscriptionMessage, enqueueReason);
        if (acquireSyncRequest != null) {
            acquireSyncRequest.execute(subscriptionMessage);
        } else {
            JMSBroker.getInternalSerialExecutor().execute((SerialExecutable) this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledge(MessageID messageID) {
        SubscriptionMessage removeAcknowledge = removeAcknowledge(messageID);
        if (removeAcknowledge != null) {
            this.manager.onSubscriptionEvent(removeAcknowledge, SubscriptionEvent.DELIVERED);
        }
    }

    public SubscriptionMessage enqueueMessage(SubscriptionMessage subscriptionMessage) {
        subscriptionMessage.setObserved(false);
        return (SubscriptionMessage) super.enqueueMessage((ProcessableMessage) subscriptionMessage);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recover(MessageID messageID, boolean z) {
        SubscriptionMessage removeAcknowledge = removeAcknowledge(messageID);
        if (removeAcknowledge == null) {
            return;
        }
        if (removeAcknowledge.isExpired()) {
            removeAcknowledge.onSubscriptionEvent(SubscriptionEvent.EXPIRED);
            return;
        }
        removeAcknowledge.getSubscriptionStatus().setCurrent((short) 1);
        removeAcknowledge.setJMSRedelivered(z);
        recoverMessage((SubscriptionQueue) removeAcknowledge);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recoverWithDelay(MessageID messageID, long j) {
        final SubscriptionMessage removeAcknowledge = removeAcknowledge(messageID);
        if (removeAcknowledge == null) {
            return;
        }
        ScheduledExecutor.getInstance().schedule(new ScheduleTask() { // from class: jeus.jms.server.manager.SubscriptionQueue.1
            public void run() {
                if (removeAcknowledge.isExpired()) {
                    removeAcknowledge.onSubscriptionEvent(SubscriptionEvent.EXPIRED);
                    return;
                }
                removeAcknowledge.getSubscriptionStatus().setCurrent((short) 1);
                removeAcknowledge.setJMSRedelivered(true);
                SubscriptionQueue.this.dispatchMessage(removeAcknowledge, DispatchMessageQueue.EnqueueReason.RECOVER);
            }
        }, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void returnBack(MessageID messageID) {
        SubscriptionMessage removeAcknowledge = removeAcknowledge(messageID);
        if (removeAcknowledge == null) {
            return;
        }
        if (removeAcknowledge.isExpired()) {
            removeAcknowledge.onSubscriptionEvent(SubscriptionEvent.EXPIRED);
        } else {
            dispatchMessage(removeAcknowledge, DispatchMessageQueue.EnqueueReason.RECOVER);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledgeAll() {
        if (LogUtils.isLoggable(logger, JeusMessage_JMS5._6701_LEVEL)) {
            LogUtils.log(logger, JeusMessage_JMS5._6701_LEVEL, JeusMessage_JMS5._6701);
        }
        Iterator<SubscriptionMessage> it = this.ackQueue.values().iterator();
        while (it.hasNext()) {
            this.manager.onSubscriptionEvent(it.next(), SubscriptionEvent.DELIVERED);
        }
        clearAcknowledges();
        synchronized (getExecutingLock()) {
            Iterator it2 = this.queue.values().iterator();
            while (it2.hasNext()) {
                this.manager.onSubscriptionEvent((SubscriptionMessage) it2.next(), SubscriptionEvent.DELIVERED);
            }
            clearQueuedMessage();
        }
    }

    public boolean isSuspendedByDestination() {
        return this.suspendedByDestination;
    }

    public boolean suspendByDestination(boolean z) {
        synchronized (this.suspendLock) {
            if (z == this.suspendedByDestination) {
                return false;
            }
            this.suspendedByDestination = z;
            if (!this.suspendedByClient && !this.suspendedByFlowControl) {
                super.setSuspend(z);
            }
            return true;
        }
    }

    public boolean suspendByFlowControl(boolean z) {
        synchronized (this.suspendLock) {
            if (z == this.suspendedByFlowControl) {
                return false;
            }
            this.suspendedByFlowControl = z;
            if (!this.suspendedByClient && !this.suspendedByDestination) {
                super.setSuspend(z);
            }
            return true;
        }
    }

    public boolean suspendByClient(boolean z) {
        synchronized (this.suspendLock) {
            if (this.suspendedByClient == z) {
                return false;
            }
            this.suspendedByClient = z;
            if (!this.suspendedByFlowControl && !this.suspendedByDestination) {
                super.setSuspend(z);
            }
            return true;
        }
    }

    public boolean isExecutable() {
        return this.manager.isDispatchable() && super.isExecutable();
    }

    @Override // jeus.jms.server.manager.DispatchMessageQueue
    public boolean preProcess(SubscriptionMessage subscriptionMessage) {
        subscriptionMessage.setRequestFlag(false);
        AbstractConsumer messageConsumer = this.manager.getMessageConsumer();
        if (messageConsumer != null && (messageConsumer instanceof JMSMessageConsumer)) {
            subscriptionMessage.setDirect(((JMSMessageConsumer) messageConsumer).getSession().getAcknowledgeMode() == -1 && subscriptionMessage.getMessageType() != 70);
        }
        subscriptionMessage.increaseDeliveryCount();
        enqueueAcknowledge(subscriptionMessage);
        return true;
    }

    @Override // jeus.jms.server.manager.DispatchMessageQueue
    public boolean process(SubscriptionMessage subscriptionMessage) throws Exception {
        return this.manager.dispatch(subscriptionMessage);
    }

    @Override // jeus.jms.server.manager.DispatchMessageQueue
    public void onException(SubscriptionMessage subscriptionMessage, Exception exc) {
        if (logger.isLoggable(JeusMessage_JMS5._6704_LEVEL)) {
            logger.log(JeusMessage_JMS5._6704_LEVEL, JeusMessage_JMS5._6704, subscriptionMessage, exc);
        }
        this.manager.dispatchFailed(subscriptionMessage);
    }

    @Override // jeus.jms.server.manager.DispatchMessageQueue
    public void onFail(SubscriptionMessage subscriptionMessage, Throwable th) {
        if (logger.isLoggable(JeusMessage_JMS5._6705_LEVEL)) {
            logger.log(JeusMessage_JMS5._6705_LEVEL, JeusMessage_JMS5._6705, subscriptionMessage, th);
        }
        this.manager.dispatchFailed(subscriptionMessage);
    }

    @Override // jeus.jms.server.manager.DispatchMessageQueue
    public boolean postProcess(SubscriptionMessage subscriptionMessage, int i) {
        if (0 != i) {
            subscriptionMessage.decreaseDeliveryCount();
        }
        switch (i) {
            case 1:
                recover(subscriptionMessage.getMessageID(), false);
                return false;
            case 2:
            case 3:
                removeAcknowledge(subscriptionMessage.getMessageID());
                break;
        }
        AbstractConsumer messageConsumer = this.manager.getMessageConsumer();
        if (i == 0 && messageConsumer != null && (messageConsumer instanceof JMSMessageConsumer) && ((JMSMessageConsumer) messageConsumer).getSession().getAcknowledgeMode() == -1 && subscriptionMessage.getMessageType() != 70) {
            try {
                ThreadPoolManager.execute(new AcknowledgeTask(subscriptionMessage.getMessageID()));
            } catch (InterruptedException e) {
                acknowledge(subscriptionMessage.getMessageID());
            }
        }
        if (!LogUtils.isLoggable(logger, JeusMessage_JMS5._6703_LEVEL)) {
            return true;
        }
        LogUtils.log(logger, JeusMessage_JMS5._6703_LEVEL, JeusMessage_JMS5._6703, Integer.valueOf(this.queue.size()));
        return true;
    }

    @Override // jeus.jms.server.manager.DispatchMessageQueue
    public boolean prepareExecution() {
        return true;
    }

    @Override // jeus.jms.server.manager.DispatchMessageQueue
    public void executionEnded() {
        this.manager.dispatchCompleted();
    }

    public Map<MessageID, SubscriptionMessage> selectorUpdated(MessageSelector messageSelector) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        synchronized (getExecutingLock()) {
            Iterator it = this.queue.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                if (!messageSelector.isSelected((ServerMessage) entry.getValue())) {
                    linkedHashMap.put(entry.getKey(), entry.getValue());
                    it.remove();
                }
            }
        }
        if (LogUtils.isLoggable(logger, JeusMessage_JMS5._6702_LEVEL)) {
            LogUtils.log(logger, JeusMessage_JMS5._6702_LEVEL, JeusMessage_JMS5._6702, linkedHashMap);
        }
        return linkedHashMap;
    }

    public AcknowledgeQueue getAckQueue() {
        return this.ackQueue;
    }
}
