We have several problems here:
- We can't simply wrap SocketFactories around each other, like we can do for InputStreams and OutputStreams.
- Java's zlib-based DeflatorOutputStream does not implement flushing.
I think I found a mechanism how this would seems to work.
This will be a some-part series, as it needs some time to write.
(You can find the source code of the completed stuff in my github repository).
A custom SocketImpl
A Socket
always is based by an object implementing SocketImpl
. Thus, having a custom socket in fact means using a custom SocketImpl class. Here is an implementation based on a pair of streams (and a base socket, for closing purposes):
/**
* A SocketImpl implementation which works on a pair
* of streams.
*
* A instance of this class represents an already
* connected socket, thus all the methods relating to
* connecting, accepting and such are not implemented.
*
* The implemented methods are {@link #getInputStream},
* {@link #getOutputStream}, {@link #available} and the
* shutdown methods {@link #close}, {@link #shutdownInput},
* {@link #shutdownOutput}.
*/
private static class WrappingSocketImpl extends SocketImpl {
private InputStream inStream;
private OutputStream outStream;
private Socket base;
WrappingSocketImpl(StreamPair pair, Socket base) {
this.inStream = pair.input;
this.outStream = pair.output;
this.base = base;
}
A StreamPair
is a simple data holder class, see below.
These are the important methods:
protected InputStream getInputStream() {
return inStream;
}
protected OutputStream getOutputStream() {
return outStream;
}
protected int available() throws IOException {
return inStream.available();
}
Then some methods to allow closing. These are not really tested (maybe we should also close or at least flush the streams?), but it seems to work for our RMI usage.
protected void close() throws IOException {
base.close();
}
protected void shutdownInput() throws IOException {
base.shutdownInput();
// TODO: inStream.close() ?
}
protected void shutdownOutput() throws IOException {
base.shutdownOutput();
// TODO: outStream.close()?
}
The next some methods will be called by the Socket constructor (or indirectly by something in the RMI engine), but don't really have to do anything.
protected void create(boolean stream) {
if(!stream) {
throw new IllegalArgumentException("datagram socket not supported.");
}
}
public Object getOption(int optID) {
System.err.println("getOption(" + optID + ")");
return null;
}
public void setOption(int optID, Object value) {
// noop, as we don't have any options.
}
All the remaining methods are not necessary, we implement them throwing Exceptions (so we will notice if this assumption was wrong).
// unsupported operations
protected void connect(String host, int port) {
System.err.println("connect(" + host + ", " + port + ")");
throw new UnsupportedOperationException();
}
protected void connect(InetAddress address, int port) {
System.err.println("connect(" + address + ", " + port + ")");
throw new UnsupportedOperationException();
}
protected void connect(SocketAddress addr, int timeout) {
System.err.println("connect(" + addr + ", " + timeout + ")");
throw new UnsupportedOperationException();
}
protected void bind(InetAddress host, int port) {
System.err.println("bind(" + host + ", " + port + ")");
throw new UnsupportedOperationException();
}
protected void listen(int backlog) {
System.err.println("listen(" + backlog + ")");
throw new UnsupportedOperationException();
}
protected void accept(SocketImpl otherSide) {
System.err.println("accept(" + otherSide + ")");
throw new UnsupportedOperationException();
}
protected void sendUrgentData(int data) {
System.err.println("sendUrgentData()");
throw new UnsupportedOperationException();
}
}
Here is the StreamPair used by the constructor:
/**
* A simple holder class for a pair of streams.
*/
public static class StreamPair {
public InputStream input;
public OutputStream output;
public StreamPair(InputStream in, OutputStream out) {
this.input = in; this.output = out;
}
}
Next part: use this to implement a Socket factory.
A Socket factory, wrapping another one.
We are dealing here with RMI socket factories (i.e. RMIClientSocketFactory, RMIServerSocketFactory, RMISocketFactory in java.rmi.server), but the same idea applies to other libraries using a socket factory interface as well. Examples are javax.net.SocketFactory (and ServerSocketFactory), Apache Axis' SocketFactory, JSch's SocketFactory.
Often, the idea of these factories is that they somehow connect to another server than the original one (a proxy), then do some negotiating and either simple can continue now in the same connection or have to tunnel the real connection through some other protocol (using wrapping streams). We instead want to let some other socket factory do the original connecting, and then do only the stream wrapping ourselves.
RMI has separate interfaces for the client and server socket factories. The client socket factories will be serialized and passed from the server to the client together with the remote stubs, allowing the client to reach the server.
There is also a RMISocketFactory
abstract class implementing both interfaces, and providing a VM-global default socket factory which will be used for all remote objects which don't have their own ones.
We will now implement a subclass of this class (and thereby also implementing both interfaces), allowing the user to give a base client and server socket factory, which we then will use. Our class must be serializable to allow passing it to the clients.
/**
* A base class for RMI socket factories which do their
* work by wrapping the streams of Sockets from another
* Socket factory.
*
* Subclasses have to overwrite the {@link #wrap} method.
*
* Instances of this class can be used as both client and
* server socket factories, or as only one of them.
*/
public abstract class WrappingSocketFactory
extends RMISocketFactory
implements Serializable
{
(Imagine all the rest indented relative to this class.)
As we want to refer to other factories, here the fields.
/**
* The base client socket factory. This will be serialized.
*/
private RMIClientSocketFactory baseCFactory;
/**
* The base server socket factory. This will not be serialized,
* since the server socket factory is used only on the server side.
*/
private transient RMIServerSocketFactory baseSFactory;
These will be initialized by straightforward constructors (which I don't repeat here - look at the github repository for the full code).
Abstract wrap
method
To let this "wrapping of socket factories" be general, we do only the general mechanism here, and do the actual wrapping of the streams in subclasses. Then we can have a compressing/decompressing subclass, a encrypting one, a logging one, etc.
Here we only declare the wrap
method:
/**
* Wraps a pair of streams.
* Subclasses must implement this method to do the actual
* work.
* @param input the input stream from the base socket.
* @param output the output stream to the base socket.
* @param server if true, we are constructing a socket in
* {@link ServerSocket#accept}. If false, this is a pure
* client socket.
*/
protected abstract StreamPair wrap(InputStream input,
OutputStream output,
boolean server);
This method (and the fact that Java doesn't allow multiple return values) is the reason for the StreamPair class. Alternatively we could have two separate methods, but in some cases (as for SSL) it is necessary to know which two streams are paired.
Client Socket Factory
Now, lets have a look at the client socket factory implementation:
/**
* Creates a client socket and connects it to the given host/port pair.
*
* This retrieves a socket to the host/port from the base client
* socket factory and then wraps a new socket (with a custom SocketImpl)
* around it.
* @param host the host we want to be connected with.
* @param port the port we want to be connected with.
* @return a new Socket connected to the host/port pair.
* @throws IOException if something goes wrong.
*/
public Socket createSocket(String host, int port)
throws IOException
{
Socket baseSocket = baseCFactory.createSocket(host, port);
We retrieve a socket from our base factory, and then ...
StreamPair streams = this.wrap(baseSocket.getInputStream(),
baseSocket.getOutputStream(),
false);
... wrap its streams by new streams. (This wrap
has to be implemented by subclasses, see below).
SocketImpl wrappingImpl = new WrappingSocketImpl(streams, baseSocket);
Then we use these streams to create our WrappingSocketImpl (see above), and pass it ...
return new Socket(wrappingImpl) {
public boolean isConnected() { return true; }
};
... to a new Socket. We have to subclass Socket
because this constructor is protected, but this is opportune since we also have to override the isConnected
method to return true
instead of false
. (Remember, our SocketImpl is already connected, and does not support connecting.)
}
For client socket factories, this is already enough. For server socket factories, it gets a bit more complicated.
Wrapping ServerSockets
There seems to be no way to create a ServerSocket with a given SocketImpl object - it always uses the static SocketImplFactory. Thus we now subclass ServerSocket, simply ignoring its SocketImpl, instead delegating to another ServerSocket.
/**
* A server socket subclass which wraps our custom sockets around the
* sockets retrieves by a base server socket.
*
* We only override enough methods to work. Basically, this is
* a unbound server socket, which handles {@link #accept} specially.
*/
private class WrappingServerSocket extends ServerSocket {
private ServerSocket base;
public WrappingServerSocket(ServerSocket b)
throws IOException
{
this.base = b;
}
It turns out we have to implement this getLocalPort
, since this number is sent with the remote stub to the clients.
/**
* returns the local port this ServerSocket is bound to.
*/
public int getLocalPort() {
return base.getLocalPort();
}
The next method is the important one. It works similar to our createSocket()
method above.
/**
* accepts a connection from some remote host.
* This will accept a socket from the base socket, and then
* wrap a new custom socket around it.
*/
public Socket accept() throws IOException {
We let the base ServerSocket accept a connection, then wrap its streams:
final Socket baseSocket = base.accept();
StreamPair streams =
WrappingSocketFactory.this.wrap(baseSocket.getInputStream(),
baseSocket.getOutputStream(),
true);
Then we create our WrappingSocketImpl, ...
SocketImpl wrappingImpl =
new WrappingSocketImpl(streams, baseSocket);
... and create another anonymous subclass of Socket:
// For some reason, this seems to work only as a
// anonymous direct subclass of Socket, not as a
// external subclass. Strange.
Socket result = new Socket(wrappingImpl) {
public boolean isConnected() { return true; }
public boolean isBound() { return true; }
public int getLocalPort() {
return baseSocket.getLocalPort();
}
public InetAddress getLocalAddress() {
return baseSocket.getLocalAddress();
}
};
This one needs some more overridden methods, as these are called by the RMI engine, it seems.
I tried to put these in a separate (non-local) class, but this did not work (gave exceptions at the client side on connecting). I have no idea why. If someone has an idea, I'm interested.
return result;
}
}
Having this ServerSocket subclass, we can complete our ...
wrapping RMI server socket factory
/**
* Creates a server socket listening on the given port.
*
* This retrieves a ServerSocket listening on the given port
* from the base server socket factory, and then creates a
* custom server socket, which on {@link ServerSocket#accept accept}
* wraps new Sockets (with a custom SocketImpl) around the sockets
* from the base server socket.
* @param host the host we want to be connected with.
* @param port the port we want to be connected with.
* @return a new Socket connected to the host/port pair.
* @throws IOException if something goes wrong.
*/
public ServerSocket createServerSocket(int port)
throws IOException
{
final ServerSocket baseSocket = getSSFac().createServerSocket(port);
ServerSocket ss = new WrappingServerSocket(baseSocket);
return ss;
}
Not much to say, it all is already in the comment. Yes, I know I could do this all in one line. (There originally were some debugging outputs between the lines.)
Let's finish the class:
}
Next time: a tracing socket factory.
A tracing socket factory.
To test our wrapping and see if there are enough flushes, here the wrap
method of a first subclass:
protected StreamPair wrap(InputStream in, OutputStream out, boolean server)
{
InputStream wrappedIn = in;
OutputStream wrappedOut = new FilterOutputStream(out) {
public void write(int b) throws IOException {
System.err.println("write(.)");
super.write(b);
}
public void write(byte[] b, int off, int len)
throws IOException {
System.err.println("write(" + len + ")");
super.out.write(b, off, len);
}
public void flush() throws IOException {
System.err.println("flush()");
super.flush();
}
};
return new StreamPair(wrappedIn, wrappedOut);
}
The input stream is used as is, the output stream simply adds some logging.
On the server side, it looks like this (the [example]
comes from ant):
[example] write(14)
[example] flush()
[example] write(287)
[example] flush()
[example] flush()
[example] flush()
[example] write(1)
[example] flush()
[example] write(425)
[example] flush()
[example] flush()
We see that there are enough flushes, even more than enough. (The numbers are the lengths of the output chunks.)
(On client side, this actually throws a java.rmi.NoSuchObjectException. It worked before ... no idea why it doesn't work now. As the compressing example does work and I'm tired, I'll not search for it now.)
Next: compressing.
Flushing compressed streams
For compression, Java has some classes in the java.util.zip
package. There is the pair DeflaterOutputStream
/ InflaterInputStream
which implement the deflate compression algorithm by wrapping another stream, filtering the data through a Deflater
or Inflater
, respectively. Deflater and Inflater are based on native methods calling the common zlib library. (Actually, the streams could also support other algorithms, if someone provided subclasses with alternate implementations of Deflater
and Inflater
.)
(There are also DeflaterInputStream and InflaterOutputStream, which work the other way around.)
Based on this, GZipOutputStream
and GZipInputStream
implement the GZip file format. (This adds mainly some header and footer, and a checksum.)
Both output streams have the problem (for our use case) that they don't truly support flush()
. This is caused by a deficiency in the API definition of Deflater, which is allowed to buffer as much data as its want until the final finish()
. Zlib allows flushing its state, just the Java wrapper is too stupid.
There is bug #4206909 open about this since January 1999, and it looks like it is finally fixed for Java 7, hurray! If you have Java 7, you can simply use DeflaterOutputStream here.
Since I don't have Java 7, yet, I'll use the workaround posted in the bug comments on 23-JUN-2002 by rsaddey.
/**
* Workaround für kaputten GZipOutputStream, von
* https://bugs.java.com/bugdatabase/view_bug?bug_id=4206909
* (23-JUN-2002, rsaddey)
* @see DecompressingInputStream
*/
public class CompressingOutputStream
extends DeflaterOutputStream {
public CompressingOutputStream (final OutputStream out)
{
super(out,
// Using Deflater with nowrap == true will ommit headers
// and trailers
new Deflater(Deflater.DEFAULT_COMPRESSION, true));
}
private static final byte [] EMPTYBYTEARRAY = new byte[0];
/**
* Insure all remaining data will be output.
*/
public void flush() throws IOException {
/**
* Now this is tricky: We force the Deflater to flush
* its data by switching compression level.
* As yet, a perplexingly simple workaround for
* http://developer.java.sun.com/developer/bugParade/bugs/4255743.html
*/
def.setInput(EMPTYBYTEARRAY, 0, 0);
def.setLevel(Deflater.NO_COMPRESSION);
deflate();
def.setLevel(Deflater.DEFAULT_COMPRESSION);
deflate();
out.flush();
}
/**
* Wir schließen auch den (selbst erstellten) Deflater, wenn
* wir fertig sind.
*/
public void close()
throws IOException
{
super.close();
def.end();
}
} // class
/**
* Workaround für kaputten GZipOutputStream, von
* https://bugs.java.com/bugdatabase/view_bug?bug_id=4206909
* (23-JUN-2002, rsaddey)
* @see CompressingOutputStream
*/
public class DecompressingInputStream extends InflaterInputStream {
public DecompressingInputStream (final InputStream in) {
// Using Inflater with nowrap == true will ommit headers and trailers
super(in, new Inflater(true));
}
/**
* available() should return the number of bytes that can be read without
* running into blocking wait. Accomplishing this feast would eventually
* require to pre-inflate a huge chunk of data, so we rather opt for a
* more relaxed contract (java.util.zip.InflaterInputStream does not
* fit the bill).
* This code has been tested to work with BufferedReader.readLine();
*/
public int available() throws IOException {
if (!inf.finished() && !inf.needsInput()) {
return 1;
} else {
return in.available();
}
}
/**
* Wir schließen auch den (selbst erstellten) Inflater, wenn
* wir fertig sind.
*/
public void close()
throws IOException
{
super.close();
inf.end();
}
} //class
(These are in the de.fencing_game.tools
package in my github repository.) It has some German comments since I originally one year ago copied this for another project of mine.)
Searching a bit on Stackoverflow I found this answer by BalusC to a related question, which offers another compressing Outputstream, with optimized flushing. I did not test this, but it might be an alternative to this one. (It uses gzip format, while we are using the pure deflate format here. Make sure both writing and reading stream fit together.)
Another alternative would be using JZlib, as bestsss proposed, with it's ZOutputStream and ZInputStream. It has not much documentation, but I'm working on it.
Next time: compressed RMI socket factory
Compressing RMI socket factory
Now we can pull it all together.
/**
* An RMISocketFactory which enables compressed transmission.
* We use {@link #CompressingInputStream} and {@link #CompressingOutputStream}
* for this.
*
* As we extend WrappingSocketFactory, this can be used on top of another
* {@link RMISocketFactory}.
*/
public class CompressedRMISocketFactory
extends WrappingSocketFactory
{
private static final long serialVersionUID = 1;
//------------ Constructors -----------------
/**
* Creates a CompressedRMISocketFactory based on a pair of
* socket factories.
*
* @param cFac the base socket factory used for creating client
* sockets. This may be {@code null}, then we will use the
* {@linkplain RMISocketFactory#getDefault() default socket factory}
* of client system where this object is finally used for
* creating sockets.
* If not null, it should be serializable.
* @param sFac the base socket factory used for creating server
* sockets. This may be {@code null}, then we will use the
* {@linkplain RMISocketFactory#getDefault() default RMI Socket factory}.
* This will not be serialized to the client.
*/
public CompressedRMISocketFactory(RMIClientSocketFactory cFac,
RMIServerSocketFactory sFac) {
super(cFac, sFac);
}
// [snipped more constructors]
//-------------- Implementation -------------
/**
* wraps a pair of streams into compressing/decompressing streams.
*/
protected StreamPair wrap(InputStream in, OutputStream out,
boolean server)
{
return new StreamPair(new DecompressingInputStream(in),
new CompressingOutputStream(out));
}
}
That's it. We now provide this factory object to UnicastRemoteObject.export(...)
as arguments (both for client and server factory), and all the communication will be compressed. (The version in my github repository has a main method with an example.)
Of course, the compression benefits will not be huge fore things like RMI, at least when you don't transfer large strings or similar stuff as arguments or return values.
Next time (after I have slept): combining with an SSL socket factory.
Combining with an SSL socket factory
The Java part of this is easy, if we use the default classes:
CompressedRMISocketFactory fac =
new CompressedRMISocketFactory(new SslRMIClientSocketFactory(),
new SslRMIServerSocketFactory());
These classes (in javax.rmi.ssl) use the default SSLSocketFactory and SSLServerSocketFactory (in javax.net.ssl), which use the system's default keystore and trust store.
Thus it is necessary to create a key store with keypair (for example by keytool -genkeypair -v
), and provide this to the VM with the system properties javax.net.ssl.keyStore
(the file name for the key store) and javax.net.ssl.keyStorePassword
(the password for the key store).
On the client side, we need a trust store - i.e. a key store containing the public keys, or some certificate which signed the public keys of the server. For testing purposes, we simply can use the same keystore as the server, for production you certainly would not want the server's private key on the client side. We provide this with the properties javax.net.ssl.trustStore
javax.net.ssl.trustStorePassword
.
Then it gets down to this (on the server side):
Remote server =
UnicastRemoteObject.exportObject(new EchoServerImpl(),
0, fac, fac);
System.err.println("server: " + server);
Registry registry =
LocateRegistry.createRegistry(Registry.REGISTRY_PORT);
registry.bind("echo", server);
The client is a stock client as for the previous examples:
Registry registry =
LocateRegistry.getRegistry("localhost",
Registry.REGISTRY_PORT);
EchoServer es = (EchoServer)registry.lookup("echo");
System.err.println("es: " + es);
System.out.println(es.echo("hallo"));
Now all communication to the EchoServer runs compressed and encrypted.
Of course, for complete security we also would want the communication to the registry SSL-protected, to avoid any man-in-the-middle attacks (which would allow also intercepting communication to the EchoServer by giving the client a fake RMIClientSocketFactory, or fake server address).
flush()
there, and this enables me using compression. (I had to subclass theDeflaterInputStream
to allow a real flushing there.) – Lehetflush()
on your streams after writing a call or a result? You should trace this. – Lehet