package org.springframework.integration.dsl;

import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/springframework/integration/dsl/PublisherIntegrationFlow.class */
public class PublisherIntegrationFlow<T> extends StandardIntegrationFlow implements Publisher<Message<T>> {
    private static final Subscription NO_OP_SUBSCRIPTION = new Subscription() { // from class: org.springframework.integration.dsl.PublisherIntegrationFlow.1
        @Override // org.reactivestreams.Subscription
        public void request(long j) {
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
        }
    };
    private final Queue<Subscriber<? super Message<T>>> subscribers;
    private final MessageChannel messageChannel;
    private final Executor executor;

    /* loaded from: input_file:org/springframework/integration/dsl/PublisherIntegrationFlow$MessageHandlerSubscription.class */
    private final class MessageHandlerSubscription extends PublisherIntegrationFlow<T>.SubscriberSubscription implements MessageHandler {
        private final Queue<Long> pendingRequests;
        private final AtomicReference<Long> currentRequest;
        private final AtomicLong count;
        private volatile boolean unbounded;

        private MessageHandlerSubscription(Subscriber<Message<?>> subscriber) {
            super(subscriber);
            this.pendingRequests = new LinkedBlockingQueue();
            this.currentRequest = new AtomicReference<>();
            this.count = new AtomicLong();
        }

        @Override // org.springframework.integration.dsl.PublisherIntegrationFlow.SubscriberSubscription
        public void onRequest(long j) {
            if (j == Long.MAX_VALUE) {
                this.unbounded = true;
                this.pendingRequests.clear();
                this.currentRequest.set(null);
                this.count.set(0L);
            } else if (!this.unbounded) {
                if (this.currentRequest.get() != null) {
                    this.pendingRequests.offer(Long.valueOf(j));
                } else {
                    this.currentRequest.set(Long.valueOf(j));
                    this.count.set(0L);
                }
            }
            ((SubscribableChannel) PublisherIntegrationFlow.this.messageChannel).subscribe(this);
        }

        @Override // org.springframework.messaging.MessageHandler
        public void handleMessage(Message<?> message) throws MessagingException {
            if (this.terminated || !PublisherIntegrationFlow.this.isRunning()) {
                ((SubscribableChannel) PublisherIntegrationFlow.this.messageChannel).unsubscribe(this);
                throw new MessageDeliveryException(message);
            }
            if (this.unbounded) {
                this.subscriber.onNext(message);
                return;
            }
            if (this.currentRequest.get() == null || this.count.getAndIncrement() == this.currentRequest.get().longValue()) {
                this.currentRequest.set(this.pendingRequests.poll());
                this.count.set(0L);
                if (this.currentRequest.get() == null) {
                    ((SubscribableChannel) PublisherIntegrationFlow.this.messageChannel).unsubscribe(this);
                    throw new MessageDeliveryException(message);
                }
            }
            this.subscriber.onNext(message);
        }

        @Override // org.springframework.integration.dsl.PublisherIntegrationFlow.SubscriberSubscription, org.reactivestreams.Subscription
        public void cancel() {
            ((SubscribableChannel) PublisherIntegrationFlow.this.messageChannel).unsubscribe(this);
            super.cancel();
        }
    }

    /* loaded from: input_file:org/springframework/integration/dsl/PublisherIntegrationFlow$PollableSubscription.class */
    private final class PollableSubscription extends PublisherIntegrationFlow<T>.SubscriberSubscription {
        private PollableSubscription(Subscriber<Message<?>> subscriber) {
            super(subscriber);
        }

        @Override // org.springframework.integration.dsl.PublisherIntegrationFlow.SubscriberSubscription
        public void onRequest(final long j) {
            PublisherIntegrationFlow.this.executor.execute(new Runnable() { // from class: org.springframework.integration.dsl.PublisherIntegrationFlow.PollableSubscription.1
                @Override // java.lang.Runnable
                public void run() {
                    if (j == Long.MAX_VALUE) {
                        while (!PollableSubscription.this.terminated && PublisherIntegrationFlow.this.isRunning()) {
                            Message<?> receive = ((PollableChannel) PublisherIntegrationFlow.this.messageChannel).receive(50L);
                            if (receive != null) {
                                PollableSubscription.this.subscriber.onNext(receive);
                            }
                        }
                        return;
                    }
                    long j2 = 0;
                    while (!PollableSubscription.this.terminated && PublisherIntegrationFlow.this.isRunning() && j2 < j) {
                        Message<?> receive2 = ((PollableChannel) PublisherIntegrationFlow.this.messageChannel).receive(50L);
                        if (receive2 != null) {
                            PollableSubscription.this.subscriber.onNext(receive2);
                            j2++;
                        }
                    }
                }
            });
        }
    }

    /* loaded from: input_file:org/springframework/integration/dsl/PublisherIntegrationFlow$SubscriberSubscription.class */
    private abstract class SubscriberSubscription implements Subscription {
        final Subscriber<Message<?>> subscriber;
        volatile boolean terminated;

        SubscriberSubscription(Subscriber<Message<?>> subscriber) {
            this.subscriber = subscriber;
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (j <= 0) {
                this.subscriber.onError(new IllegalArgumentException("Spec. Rule 3.9 - Cannot request a non strictly positive number: " + j));
            } else {
                if (this.terminated || !PublisherIntegrationFlow.this.isRunning()) {
                    return;
                }
                onRequest(j);
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            PublisherIntegrationFlow.this.subscribers.remove(this.subscriber);
            this.terminated = true;
        }

        protected abstract void onRequest(long j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PublisherIntegrationFlow(Set<Object> set, MessageChannel messageChannel, Executor executor) {
        super(set);
        this.subscribers = new LinkedBlockingQueue();
        this.messageChannel = messageChannel;
        this.executor = executor;
        start();
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super Message<T>> subscriber) {
        if (!isRunning()) {
            subscriber.onSubscribe(NO_OP_SUBSCRIPTION);
            subscriber.onError(new IllegalStateException("The Publisher must be started ('Lifecycle.start()') before accepting subscription."));
            return;
        }
        this.subscribers.add(subscriber);
        if (this.messageChannel instanceof SubscribableChannel) {
            subscriber.onSubscribe(new MessageHandlerSubscription(subscriber));
        } else if (this.messageChannel instanceof PollableChannel) {
            subscriber.onSubscribe(new PollableSubscription(subscriber));
        } else {
            subscriber.onSubscribe(NO_OP_SUBSCRIPTION);
            subscriber.onError(new IllegalStateException("Unsupported MessageChannel type [" + this.messageChannel + "]. Must be 'SubscribableChannel' or 'PollableChannel'."));
        }
    }

    @Override // org.springframework.integration.dsl.StandardIntegrationFlow, org.springframework.context.Lifecycle
    public void stop() {
        super.stop();
        shutdown();
    }

    public void shutdown() {
        while (true) {
            Subscriber<? super Message<T>> poll = this.subscribers.poll();
            if (poll == null) {
                return;
            } else {
                poll.onComplete();
            }
        }
    }
}
