package jasync;

import java.net.Socket;
import java.net.SocketException;
import java.io.EOFException;
import java.io.InputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.StreamCorruptedException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Handles a connection from a client.
 * @see ClientServerConnection
`* @see ServerListener
 */
class ServerConnectionHandler extends Thread {
    /**
     * incoming queue
     */
    protected BlockingQueue<Message> inQueue;

    /**
     * outgoing queue
     */
    protected BlockingQueue<Message> outQueue;

    /**
     * socket connection
     */
    protected Socket socket;

    /**
     * The remote peer.
     */
    protected Peer peer;

    /**
     * Local information
     */
    protected Peer self;

    /**
     * set when handshake is complete
     */
    protected boolean initialized = false;

    /**
     * from the socket. Don't touch this directly.
     */
    protected ObjectInputStream oInput;

    /**
     * from the socket. Don't touch this directly.
     */
    protected ObjectOutputStream oOutput;

    /**
     * Input handler thread.
     */
    protected InputThread inputThread;

    /**
     * output handler thread.
     */
    protected OutputThread outputThread;

    /**
     * Creates message objects.
     */
    protected MessageFactory factory;

    /**
     * Constructor.
     * @param socket A connected socket.
     * @param self The local peer information.
     */
    public ServerConnectionHandler (Socket socket, Peer self) {
	outQueue = new LinkedBlockingQueue<Message>();
	inQueue = new LinkedBlockingQueue<Message>();
	this.socket = socket;
	this.self = self;
    }

    /**
     * Should only be called when the handshake is complete and the queues are
     * are ready to transport messages.
     */
    protected void setInitialized () {
	synchronized (this) {
	    initialized = true; // it is invalid to go from false to true
	    this.notifyAll();
	}
    }
    
    /**
     * Tests if the connection is initialized.
     * @return true if the connection is ready to accept messages.
     */
    public synchronized boolean isInitialized () {
	synchronized (this) {
	    /* I don't think this needs to be sync'd... but better safe than
	       sorry */
	    return initialized;
	}
    }

    /**
     * Blocks until the connection is initialized.
     */
    public synchronized void waitInitialized () throws InterruptedException {
	synchronized (this) {
	    if (!initialized) {
		this.wait();
	    }
	}
    }

    /**
     * Drops the connection cleanly. Implicitly terminates the InputThread and
     * OutputThread worker threads.
     */
    public void disconnect () {
	try {
	    socket.close();
	}
	catch (Throwable e) {
	    e.printStackTrace();
	}
    }

    /**
     * Returns the input queue
     * @return the input queue
     */
    public BlockingQueue<Message> getInQueue () {
	return inQueue;
    }
    
    /**
     * Returns the output queue
     * @return the output queue
     */
    public BlockingQueue<Message> getOutQueue () {
	return outQueue;
    }
    
    /**
     * Returns the message factory
     * @return the message factory
     * @see MessageFactory
     */
    public MessageFactory getFactory () {
	return factory;
    }

    /**
     * Returns the remote peer
     * @return the remote peer
     * @see Peer
     */
    public Peer getPeer () {
	return peer;
    }

    /* public void blockingStart () {
	super.start();

	synchronized (this) {
	    if (!initialized) {
		try {
		    this.wait();
		}
		catch (InterruptedException e) {
		    e.printStackTrace();
		}
	    }
	}
	} */


    /**
     * Initializes the connection with the other peer, starts the InputThread
     * and OutputThread helpers.
     */
    public void run () {
	Message temp, handshake;

	/* handshake */
	try {
	    oInput = new ObjectInputStream(socket.getInputStream());
	    handshake = (Message) oInput.readObject();
	    
	    if (handshake.getType() != MessageType.PUSH ||
		handshake.getSync() != MessageSync.SYNC ||
		handshake.getStatus() != MessageStatus.OK ||
		handshake.getCommand() != MessageCommand.HANDSHAKE ||
		handshake.getData().length != 1 ||
		!(handshake.getData()[0] instanceof Peer)) {
		/* Seems to be a serialized object, but not the right one. */
		socket.close();
	    }

	    peer = (Peer) handshake.getData()[0];

	    factory = new MessageFactory(self);

	    inQueue.put(MessageFactory.makeAsyncConnectMessage());

	    oOutput = new ObjectOutputStream(socket.getOutputStream());
	    oOutput.writeObject(factory.makeHandshakeMessageAck(handshake));

	    setInitialized();

	    outputThread = new OutputThread(peer, oOutput, outQueue);
	    inputThread = new InputThread(peer, outputThread, oInput, inQueue);
	    outputThread.start();
	    inputThread.start();
	    
	    outputThread.join();
	    inputThread.join();

	    socket.close();
	    
	    inQueue.put(MessageFactory.makeAsyncDisconnectMessage());
	}
	catch (StreamCorruptedException e) {
	    /* Someone is typing gibberish at the keyboard to see if I test for
	       this. */
	
	    Debug.debug("Protocol mismatch with " +
                        socket.getRemoteSocketAddress() + ". Disconnecting.");

	    try {
		socket.close();
	    }
	    catch (IOException f) {
		e.printStackTrace();
	    }
	}
	catch (EOFException e) {
	    Debug.debug("Protocol mismatch with " +
                        socket.getRemoteSocketAddress() + ". Disconnecting.");

	    try {
		socket.close();
	    }
	    catch (IOException f) {
		e.printStackTrace();
	    }
	}
	catch (ClassNotFoundException e) {
	    Debug.debug("Protocol mismatch with " +
                        socket.getRemoteSocketAddress() + ". Disconnecting.");

	    try {
		socket.close();
	    }
	    catch (IOException f) {
		e.printStackTrace();
	    }
	}
	catch (InterruptedException e) {
	    e.printStackTrace();

	    try {
		socket.close();
	    }
	    catch (IOException f) {
		e.printStackTrace();
	    }
	}
	catch (IOException e) {
	    e.printStackTrace();
	    return;
	}
    }
}

