Witajcie!
Piszę prosty serwer obsługujący z góry narzuconą ilość jednoczesnych połączeń. Ma ich być docelowo mało: dwa lub trzy.
Mój serwer to prosty Singleton. Problem mam z klasami, które miałyby implementować jednoczesną obsługę klientów.
Korzystając z semafora nadaję uprawnienia do połączenia lub je odrzucam. Chciałbym, aby każdy klient miał swój wątek działający asynchronicznie, jednocześnie pozostawiając możliwość serwerowi kontroli nad nimi. Myślałem o tym, żeby każde połączenie było reprezentowane przez Future, dzięki czemu miałbym możliwość wywołania w razie czego cancel(true), które z użyciem nieludzkiej siły zakończyłoby komunikację. Problem polega w implementacji. Wpadłem na pomysł, aby stworzyć 2 klasy:

  • klasa zewnętrzna nadzorująca połączenie (z referencją do semafora, aby po zakończeniu komunikacji wywołać semaphore.release())
  • klasa wewnętrzna odbierająca dane (na ten czas)

Pytanie brzmi: która z nich musiałaby implementować implementować interfejs Runnable, żeby nie zatrzymać mi głównego wątku serwera? Czy obydwie powinny go implementować, aby móc jednocześnie odbierać dane i mieć możliwość ubicia komunikacji?

Już tyle razy usuwałem wszystko i próbowałem pisać od nowa, że potrzebuję pomocy.

Serwer:

package application.server;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class Server {

    private int numberOfClients = 2;
    private int currentClientNumber = 0;
    private static Server server = null;
    private Semaphore semaphore = new Semaphore(numberOfClients);
    private ExecutorService executorService = Executors.newCachedThreadPool();
    private ConnectionManager[] clients = new ConnectionManager[numberOfClients];

    public static Server getInstance(int port) {
        if (server == null) {
            server = new Server(port);
        }
        return server;
    }

    private Server(int port) {
        startServer(port);
    }

    private void startServer(int port) {
        try (ServerSocket serverSocket = new ServerSocket(port))
        {
            Socket socket;
            while (true) {
                socket = serverSocket.accept();
                if (isSessionAvailable()) {
                    semaphore.acquire();
                    clients[currentClientNumber++] = new ConnectionManager(socket, executorService, semaphore);
                    executorService.execute(clients[currentClientNumber-1]);
                } else {
                    socket.getOutputStream().write("Too many clients!".getBytes());
                    socket.close();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private boolean isSessionAvailable() {
        return semaphore.availablePermits() > 0;
    }
}

ConnectionManager:

package application.server;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;

public class ConnectionManager implements Runnable{

    private final Semaphore semaphore;
    private final Socket socket;
    private final ExecutorService executorService;
    private Future connection;

    public ConnectionManager(Socket socket, ExecutorService executorService, Semaphore semaphore) {
        this.socket = socket;
        this.executorService = executorService;
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        startConnection();
    }

    public void startConnection() {
        connection = executorService.submit(new Connection());
        try {
            connection.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }

    public boolean killConnection() {
        connection.cancel(true);
        return connection.isCancelled();
    }


    class Connection implements Runnable{

        @Override
        public void run() {
            try (
                    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))
                    )
            {
                String input;
                while (!(input = reader.readLine()).equals("exit")) {
                    System.out.println(input);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Client:

package application.client;

import org.apache.log4j.Logger;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;

public class Client {

    final static Logger logger = Logger.getLogger(Client.class);

    private int port;
    private String host;

    public Client(int port, String host) {
        this.port = port;
        this.host = host;
    }

    public void startClientSession() {
        try (
                Socket socket = new Socket(host, port);
                PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
                BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
                )
        {
            logger.info("Started client service");
            String line;
            while (!(line = in.readLine()).equals("exit")) {
                out.write(line);
            }
            logger.info("Ended client service");
        } catch (UnknownHostException e) {
            logger.error("The host is unknown!", e);
        } catch (IOException e) {
            logger.error("Caught I/O exception", e);
        }
    }

}

Przykładowe wywołanie czegoś takiego daje:

hihellohi all!
java.util.concurrent.ExecutionException: java.lang.NullPointerException
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at application.server.ConnectionManager.startConnection(ConnectionManager.java:33)
	at application.server.ConnectionManager.run(ConnectionManager.java:27)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at application.server.ConnectionManager$Connection.run(ConnectionManager.java:58)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more

screenshot-20170824202427.png