Stream fetched from Postgres with jOOQ not returning results from class
Asked Answered
S

1

2

Issue

I am attempting to stream results from a postgres query to a frontend application, rather than eagerly fetching all results. The problem is that I can only see streamed results only in my terminal (i.e. first in "org.jooq.tools.LoggerListener : Record fetched: ..." and then with a stream.get().forEach(s -> debug)), and the class which references this stream only produces null values when called upon to view the ResultSet in the frontend.

This data may be used for other tasks as well (e.g. visualization, downloading / exporting, summary statistics, etc.). I have been looking through the documentation and posts about jOOQ, which I am using as my ORM, and there are the following methods which I am trying to use :

Eagerly fetching with the following works perfectly for now, but this will return everything in one giant ResponseEntity and won't stream results :


Current classes

DataController.java

@RestController
@RequestMapping(value = "/v3")
@Validated
public class DataController {

  @Autowired private QueryService queryService;

  @PostMapping(value = "/data", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  @ApiOperation(value = "Query the data")
  @ResponseStatus(HttpStatus.CREATED)
  public ResponseEntity<QueryResult> getQueryResults(
      @RequestBody @ValidQuery Query query, HttpServletRequest request) {

    QueryResult res = queryService.search(query);
    return ResponseEntity.ok(res);
  }
// ...
}

QueryResult.java

public QueryResult(Stream<Record> result) {
    this.result = result;
  }

//  public List<Map<String, Object>> getResult() { return result; }
  @JsonProperty("result")
  public Stream<Record> getResult() { return result; }


//  public void setResult(List<Map<String, Object>> result) { this.result = result; }
  public void setResult(Stream<Record> result) { this.result = result; }

}

QueryService.java

@Service
public class QueryService implements SearchService{
  @Autowired DefaultDSLContext dslContext;

  public QueryResult search(Query query) {

    LinkedHashMap<DataSourceName, List<String>> selections = query.getSelections();

    // Build selected fields
    List<SelectField> selectFields = QueryUtils.getSelectionFields(selections);

    // Current support is for a single query. All others passed will be ignored
    List<Filter> filters = query.getFilters();
    Filter leadingFilter = QueryUtils.getLeadingFilter(filters);

    // Build "where" conditions
    Condition conditionClause = QueryUtils.getConditionClause(leadingFilter);

    // Get "from" statement
    Table<Record> fromClause = QueryUtils.getFromStatement(fromDataSource,query.getJoins());

    /*
    // Works fine, but is not lazy fetching
    List<Map<String, Object>> results =
        dslContext
            .select(selectFields)
            .from(fromClause)
            .where(conditionClause)
            .limit(query.getOffset(), query.getLimit())
            .fetchMaps();
    */

      // Appears to work only once. 
      // Cannot see any results returned, but the number of records is correct. 
      // Everything in the records is null / undefined in the frontend
      Supplier<Stream<Record>> results = () ->
              dslContext
                      .select(selectFields)
                      .from(fromClause)
                      .where(conditionClause)
                      .limit(query.getOffset(), query.getLimit())
                      .fetchStream();

      // "stream has already been operated upon or closed" is returned when using a Supplier
      results.get().forEach(s -> logger.debug("Streamed record: \n" + String.valueOf(s)));

      return new QueryResult(results.get());

  }
}

Query.java

public class Query {
  @NotNull(message = "Query must contain selection(s)")
  private LinkedHashMap<DataSourceName, List<String>> selections;
  private List<Filter> filters;
  private List<Join> joins;
  private List<Sort> sorts;
  private long offset;
  private int limit;

  private QueryOptions options;

  @JsonProperty("selections")
  public LinkedHashMap<DataSourceName, List<String>> getSelections() {
    return selections;
  }

  public void setSelections(LinkedHashMap<DataSourceName, List<String>> selections) {
    this.selections = selections;
  }

  @JsonProperty("filters")
  public List<Filter> getFilters() {
    return filters;
  }

  public void setFilters(List<Filter> filters) {
    this.filters = filters;
  }

  @JsonProperty("joins")
  public List<Join> getJoins() {
    return joins;
  }

  public void setJoins(List<Join> joins) {
    this.joins = joins;
  }

  @JsonProperty("sorts")
  public List<Sort> getSorts() {
    return sorts;
  }

  public void setSorts(List<Sort> sorts) {
    this.sorts = sorts;
  }

  @JsonProperty("options")
  public QueryOptions getOptions() {
    return options;
  }

  public void setOptions(QueryOptions options) {
    this.options = options;
  }

  @JsonProperty("offset")
  public long getOffset() {
    return offset;
  }

  public void setOffset(long offset) {
    this.offset = offset;
  }

  @JsonProperty("limit")
  public int getLimit() {
    return limit;
  }

  public void setLimit(int limit) {
    this.limit = limit;
  }

  @Override
  public String toString() {
    return "Query{"
        + "selections=" + selections
        + ", filters="  + filters
        + ", sorts="    + sorts
        + ", offSet="   + offset
        + ", limit="    + limit
        + ", options="  + options
        + '}';
  }
}

DataApi.js

// ...
const dataApi = axios.create({baseURL: `${my_data_url}`,});
// ...
export default dataApi;

Data.jsx

// ...

// This block queries Spring, and it returns the ResponseEntity with the ResultSet
// Streaming returns the right number of records, but every record is null / undefined
try {
      const response = await dataApi.post('/v3/data', query);
} catch (error) {
// ...
}
// ...

Returned Result in console

{data: {…}, status: 200, statusText: "OK", headers: {…}, config: {…}, …}
data:
result: Array(100)
0: {}
1: {}
2: {}
3: {}
...

Stack :

  • Docker : 19.03.5
  • Spring Boot : v2.1.8.RELEASE
  • Node : v12.13.1
  • React : 16.9.0
  • OpenJDK : 12.0.2
  • jOOQ : 3.12.3
  • postgres : 10.7
Sidedress answered 28/1, 2020 at 20:7 Comment(1)
I run into the same issue, do you have any updates?Fluorspar
E
2

The whole point of the Java Stream API is for such a stream to be consumed at most once. It does not have any buffering feature, nor does it support a push based streaming model like reactive stream implementations do.

You could add another API to your stack, such as e.g. Reactor (there are others, but since you're already using Spring...), which supports buffering and replaying streams to several consumers, but that has nothing to do with jOOQ directly and will heavily influence your application's architecture.

Notice that jOOQ's ResultQuery extends org.reactivestreams.Publisher and JDK 9's Flow.Publisher for better interoperability with such reactive streams.

Emelda answered 29/1, 2020 at 11:37 Comment(2)
I run into the same issue. I'd like to return Stream from my Spring controller, however I get empty list. I found a related question #37156057 but it doesn't help becouse I get error message: This ResultSet is closed.; nested exception is org.postgresql.util.PSQLException: This ResultSet is closed.Fluorspar
@wakedeer: Can you please ask a new question with details.Emelda

© 2022 - 2024 — McMap. All rights reserved.