package jasync;

import java.io.EOFException;
import java.io.ObjectInputStream;
import java.net.SocketException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Helper class that deserializes input from a socket and eunqueues it
 * @see OutputThread
 */
public class InputThread extends Thread {
    /**
     * Input queue. Objects read from the network are placed in here.
     */
    protected BlockingQueue<Message> inQueue;

    /**
     * From the socket.
     */
    protected ObjectInputStream oInput;

    /**
     * Remote peer.
     */
    protected Peer peer;

    /**
     * The thread responsible for output. This thread may interrupt it.
     */
    protected Thread outputThread;

    /**
     * Constructor.
     * @param peer Peer instance from the handshake.
     * @param outputThread OutputThread instance to be used in conjunction.
     * @param oInput ObjectOutputStream created from the socket.
     * @param inQueue Queue where incoming messages will be placed.
     * @see OutputThread
     */
    public InputThread (Peer peer, Thread outputThread,
			ObjectInputStream oInput,
			BlockingQueue<Message> inQueue) {
	this.peer = peer;
	this.outputThread = outputThread;
	this.oInput = oInput;
	this.inQueue = inQueue;
    }


    /**
     * Blocks on input, enqueues incoming messages.
     */
    public void run () {
	try {
	    while (true) {
		inQueue.put((Message) oInput.readObject());
	    }
	}
	catch (SocketException e) {
	    /* local disconnect */
	    outputThread.interrupt();
	    return;
	}
	catch (EOFException e) {
	    /* remote disconnect */
	    outputThread.interrupt();
	    return;
	}
	catch (Throwable e) {
	    /* basically anything that happens is cause for a disconnect */
	    e.printStackTrace();
	    return;
	}
    }
}

