package jasync;

import java.net.ConnectException;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.io.EOFException;
import java.io.InputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.StreamCorruptedException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Encapsulates the details required for a client to communicate with a server.
 * Once established, communication is done entirely through the queues.
 */
public class ClientServerConnection extends Thread {
    /**
     * Provides output serialization. This should only be touched by
     * outputThread.
     */
    protected ObjectOutputStream oOutput;

    /**
     * Provides input serialization. This should only be touched by
     * inputThread.
     */
    protected ObjectInputStream oInput;

    /**
     * Handles deserialization. This thread should be handled by the
     * ClientServerConnection instance.
     */
    protected InputThread inputThread;

    /**
     * Handles serialization. This thread should be handled by the
     * ClientServerConnection instance.
     */
    protected OutputThread outputThread;

    /**
     * Messages in this queue are sent by outputThread.
     * @see #getOutQueue()
     */
    protected BlockingQueue<Message> outQueue;

    /**
     * Messages recieved from the network are placed in this queue for use by
     * the application
     * @see #getInQueue()
     */
    protected BlockingQueue<Message> inQueue;

    /**
     * The socket connection. This should not be used directly.
     */
    protected Socket socket;

    /**
     * The Peer instance for the other endpoint of the connection.
     * @see #getPeer()
     */
    protected Peer peer;

    /**
     * The Peer instance for this end of the connection.
     */
    protected Peer self;

    /**
     * True when the handshake has taken place and the connection is ready to
     * recieve/transmit message. Should not be used directly, only used to
     * allow start() to block.
     */
    protected boolean initialized = false;

    /**
     * The MessageFactory instance. Should be used whenever possible to create
     * Message objects, as it tracks ID numbers and other information.
     * @see #getFactory()
     */
    protected MessageFactory factory;

    /**
     * Constructor.
     * @param socket A connected Socket instance.
     * @param self A Peer instance to negotiate a connection with the other
     * side.
     */
    public ClientServerConnection (Socket socket, Peer self)
	throws IOException {
	inQueue = new LinkedBlockingQueue<Message>();
	outQueue = new LinkedBlockingQueue<Message>();
	
	this.socket = socket;
	this.self = self;
    }
    
    /**
     * Disconnects the socket, implicitly terminates all handler threads.
     */
    public void disconnect () {
	try {
	    socket.close();
	}
	catch (Throwable e) {
	    e.printStackTrace();
	}
    }

    /**
     * Returns the input queue.
     * @return inputQueue
     */       
    public BlockingQueue<Message> getInQueue () {
	return inQueue;
    }
   
    /** 
     * Returns the output queue.
     * @return outputQueue
     */
    public BlockingQueue<Message> getOutQueue () {
	return outQueue;
    }
    
    /**
     * Returns the MessageFactory instance for this object.
     * @return the MessageFactoryty instance for this object.
     */
    public MessageFactory getFactory () {
	return factory;
    }

    /** 
     * Returns the Peer instance for the other side of the connection.
     * @return Peer instance of the other side of the connection.
     */
    public Peer getPeer () {
	return peer;
    }

    /** 
     * Starts the master thread for this connection AND InputThread and
     * OutputThread helpers. Note that this thread will perform the handshake.
     * Returns only when the handshake is complete.
     */
    public void start () {
	super.start();

	synchronized (this) {
	    if (!initialized) {
		try {
		    this.wait();
		}
		catch (InterruptedException e) {
		    e.printStackTrace();
		}
	    }
	}
    }

    /**
     * Handlers initialization of the connection, starts helper threads,
     * notifies the thread that called start() because that blocks until
     * success.
     */
    public void run () {
	int handshakeID;
	Message temp = null;
	
	factory = new MessageFactory(self);
	
	try {
	    oOutput = new ObjectOutputStream(socket.getOutputStream());

	    /* handshake */
	    temp = factory.makeHandshakeMessage();
	    handshakeID = temp.getUniqueID(); // we need to remmber of the ack
	    oOutput.writeObject(temp);
	    
	    oInput = new ObjectInputStream(socket.getInputStream());
	    try {
		temp = (Message) oInput.readObject();
	    }
	    catch (ClassNotFoundException e) {
		this.notifyAll();
		e.printStackTrace();
		return;
	    }
	    
	    /* if there's something wrong with the handshake... */
	    if (temp.getUniqueID() != handshakeID ||
		temp.getType() != MessageType.PUSH_ACK ||
		temp.getSync() != MessageSync.SYNC ||
		temp.getStatus() != MessageStatus.OK ||
		temp.getCommand() != MessageCommand.HANDSHAKE ||
		temp.getData().length < 1 ||
		!(temp.getData()[0] instanceof Peer)) {
		/* Seems to be a serialized object, but not the right one. */
		Debug.debug("Protocol mismatch with " +
			    socket.getRemoteSocketAddress() +
			    ". Disconnecting.");
		socket.close();
		this.notifyAll();
		return;
	    }

	    peer = (Peer) temp.getData()[0]; // pulls peer from handshake msg
	    
	    outputThread = new OutputThread(peer, oOutput, outQueue);
	    inputThread = new InputThread(peer, outputThread, oInput, inQueue);
	    outputThread.start();	    
	    inputThread.start();

	    /* we're initialized. Someone is waiting for that. */
	    synchronized (this) {
		this.notifyAll();
	    }

	    /* connect pseudo-message */
	    inQueue.put(MessageFactory.makeAsyncConnectMessage());
	    
	    outputThread.join();
	    inputThread.join();

	    socket.close();

	    /* disconnect pseudo-message */
	    inQueue.put(MessageFactory.makeAsyncDisconnectMessage());
	}
	catch (StreamCorruptedException e) {
	    this.notifyAll();

	    Debug.debug("Protocol mismatch with " +
                        socket.getRemoteSocketAddress() + ". Disconnecting.");

	    try {
		socket.close();
	    }
	    catch (IOException f) {
		e.printStackTrace();
	    }
	}
	catch (EOFException e) {
	    this.notifyAll();

	    Debug.debug("Protocol mismatch with " +
                        socket.getRemoteSocketAddress() + ". Disconnecting.");

	    try {
		socket.close();
	    }
	    catch (IOException f) {
		e.printStackTrace();
	    }
	}
	catch (IOException e) {
	    this.notifyAll();

	    e.printStackTrace();
	}
	catch (InterruptedException e) {
	    this.notifyAll();

	    e.printStackTrace();
	}
	catch (Throwable e) {
	    this.notifyAll();

	    e.printStackTrace();
	}
    }
}

