/*
 * Decompiled with CFR 0.152.
 */
package org.activeio.adapter;

import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.Latch;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.activeio.AsynchChannel;
import org.activeio.ChannelFactory;
import org.activeio.FilterAsynchChannel;
import org.activeio.Packet;

public class AsynchWriteAsynchChannelAdapter
extends FilterAsynchChannel {
    private final ObjectDispatcher dispatcher;
    private static final Object FLUSH_COMMAND = new Object();

    public AsynchWriteAsynchChannelAdapter(AsynchChannel next) {
        this(next, 10);
    }

    public AsynchWriteAsynchChannelAdapter(AsynchChannel next, int queueSize) {
        super(next);
        this.dispatcher = new ObjectDispatcher(this, queueSize);
    }

    public void onObject(Object o) {
        try {
            if (o == FLUSH_COMMAND) {
                this.next.flush();
                return;
            }
            if (o.getClass() == Latch.class) {
                this.next.flush();
                ((Latch)o).release();
                return;
            }
            this.next.write((Packet)o);
        }
        catch (IOException e) {
            this.channelListener.onPacketError(e);
        }
    }

    public void write(Packet packet) throws IOException {
        try {
            this.dispatcher.add(packet);
        }
        catch (InterruptedException e) {
            throw new InterruptedIOException();
        }
    }

    public void flush() throws IOException {
        this.flush(0L);
    }

    public void stop(long timeout) throws IOException {
        this.flush(-1L);
    }

    private void flush(long timeout) throws InterruptedIOException {
        try {
            if (timeout == 0L) {
                this.dispatcher.add(FLUSH_COMMAND);
            } else if (timeout == -1L) {
                Latch l = new Latch();
                this.dispatcher.add(l);
                l.acquire();
            } else {
                Latch l = new Latch();
                this.dispatcher.add(l);
                l.attempt(timeout);
            }
        }
        catch (InterruptedException e) {
            throw new InterruptedIOException();
        }
    }

    public static class ObjectDispatcher {
        private final PooledExecutor executor;
        private final AsynchWriteAsynchChannelAdapter objectListener;

        public ObjectDispatcher(AsynchWriteAsynchChannelAdapter objectListener) {
            this(objectListener, 10);
        }

        public ObjectDispatcher(AsynchWriteAsynchChannelAdapter objectListener, int queueSize) {
            this.objectListener = objectListener;
            this.executor = new PooledExecutor((Channel)new BoundedBuffer(queueSize), 1);
            this.executor.waitWhenBlocked();
        }

        public void add(Object o) throws InterruptedException {
            this.executor.execute(new Runnable(this, o){
                private final /* synthetic */ Object val$o;
                private final /* synthetic */ ObjectDispatcher this$0;
                {
                    this.this$0 = this$0;
                    this.val$o = val$o;
                }

                public void run() {
                    ObjectDispatcher.access$000(this.this$0).onObject(this.val$o);
                }
            });
        }

        static /* synthetic */ AsynchWriteAsynchChannelAdapter access$000(ObjectDispatcher x0) {
            return x0.objectListener;
        }
    }

    public static class ObjectDispatcherX
    implements Runnable {
        private final Executor executor;
        private final Channel queue;
        private final SynchronizedInt size = new SynchronizedInt(0);
        private final AsynchWriteAsynchChannelAdapter objectListener;
        private long pollDelay = 10L;

        public ObjectDispatcherX(AsynchWriteAsynchChannelAdapter objectListener) {
            this(objectListener, 10);
        }

        public ObjectDispatcherX(AsynchWriteAsynchChannelAdapter objectListener, int queueSize) {
            this(objectListener, ChannelFactory.DEFAULT_EXECUTOR, (Channel)new BoundedBuffer(queueSize));
        }

        public ObjectDispatcherX(AsynchWriteAsynchChannelAdapter objectListener, Executor executor, Channel queue) {
            this.objectListener = objectListener;
            this.executor = executor;
            this.queue = queue;
        }

        public void add(Object o) throws InterruptedException {
            int t = this.size.increment();
            this.queue.put(o);
            if (t == 1) {
                this.executor.execute((Runnable)this);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public synchronized void run() {
            int t = this.size.get();
            while (t > 0) {
                int count = 0;
                try {
                    Object o;
                    while ((o = this.queue.poll(this.pollDelay)) != null) {
                        ++count;
                        this.objectListener.onObject(o);
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
                finally {
                    t = this.size.subtract(count);
                }
            }
        }
    }
}

