The first thing you have to understand is that code like
try (Connection connection = dataSource.getConnection()) {
…
try (PreparedStatement pSt = connection.prepareStatement(sql)) {
…
return stream;
}
}
does not work as by the time you leave the try
blocks, the resources are closed while the processing of the Stream
hasn’t even started.
The resource management construct “try with resources” works for resources used within a block scope inside a method but you are creating a factory method returning a resource. Therefore you have to ensure that the closing of the returned stream will close the resources and the caller is responsible for closing the Stream
.
Further, you need a function which produces an item out of a single line from the ResultSet
. Supposing, you have a method like
Record createRecord(ResultSet rs) {
…
}
you may create a Stream<Record>
basically like
Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
Long.MAX_VALUE,Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super Record> action) {
if(!resultSet.next()) return false;
action.accept(createRecord(resultSet));
return true;
}
}, false);
But to do it correctly you have to incorporate the exception handling and closing of resources. You can use Stream.onClose
to register an action that will be performed when the Stream
gets closed, but it has to be a Runnable
which can not throw checked exceptions. Similarly the tryAdvance
method is not allowed to throw checked exceptions. And since we can’t simply nest try(…)
blocks here, the program logic of suppression exceptions thrown in close
, when there is already a pending exception, doesn’t come for free.
To help us here, we introduce a new type which can wrap closing operations which may throw checked exceptions and deliver them wrapped in an unchecked exception. By implementing AutoCloseable
itself, it can utilize the try(…)
construct to chain close operations safely:
interface UncheckedCloseable extends Runnable, AutoCloseable {
default void run() {
try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
}
static UncheckedCloseable wrap(AutoCloseable c) {
return c::close;
}
default UncheckedCloseable nest(AutoCloseable c) {
return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
}
}
With this, the entire operation becomes:
private Stream<Record> tableAsStream(DataSource dataSource, String table)
throws SQLException {
UncheckedCloseable close=null;
try {
Connection connection = dataSource.getConnection();
close=UncheckedCloseable.wrap(connection);
String sql = "select * from " + table;
PreparedStatement pSt = connection.prepareStatement(sql);
close=close.nest(pSt);
connection.setAutoCommit(false);
pSt.setFetchSize(5000);
ResultSet resultSet = pSt.executeQuery();
close=close.nest(resultSet);
return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
Long.MAX_VALUE,Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super Record> action) {
try {
if(!resultSet.next()) return false;
action.accept(createRecord(resultSet));
return true;
} catch(SQLException ex) {
throw new RuntimeException(ex);
}
}
}, false).onClose(close);
} catch(SQLException sqlEx) {
if(close!=null)
try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
throw sqlEx;
}
}
This method wraps the necessary close operation for all resources, Connection
, Statement
and ResultSet
within one instance of the utility class described above. If an exception happens during the initialization, the close operation is performed immediately and the exception is delivered to the caller. If the stream construction succeeds, the close operation is registered via onClose
.
Therefore the caller has to ensure proper closing like
try(Stream<Record> s=tableAsStream(dataSource, table)) {
// stream operation
}
Note that also the delivery of an SQLException
via RuntimeException
has been added to the tryAdvance
method. Therefore you may now add throws SQLException
to the createRecord
method without problems.
java.util.stream.Stream
may not actually be suited for what you have in mind. – PhotinaResultSet
is like a stream. You can only process one row of the result at once. Or do you want to process theResultSet
with the streaming api? – LagerResultSet
to java 8stream
and pass thisstream
object to another class. In another class I would like to iterate over thisstream
and write the results toFile
. – EnthusiasmSELECT *
queries and are now blaming the framework for doing exactly what you told it to... Why not just write smarter queries instead? Also, the OP writesfetch(resultSet)
which eagerly fetches everything into memory (as documented), rather than writingfetchLazy(resultSet)
, which keeps an open cursor. It works as designed and documented... – CrwthSELECT *
or if I am loading clobs inadvertently. I needed to lazily load clob in a lazy stream which would give flexibility to decide when to load clobs or not load them at allbased on some conditions
. I understand its all documented but I found it very surprising that JOOQ loaded the whole clob eagerly in a lazy stream. Anyways, I found a solution by moving the clob values to a blob store in cloud. – MarquardtSELECT *
orSELECT a, b, clob
. The point is, why include theclob
column in theSELECT
clause when you could use "some conditions" to decide whether you actually need to include it in yourSELECT
clause? I'd still be very happy to offer answering a specific new question if this isn't clear... It doesn't seem to be...? – Crwth