Limiting SQL query to defined fields/columns in Graphene-SQLAlchemy
Asked Answered
B

1

6

This question has been posted as a GH issues under https://github.com/graphql-python/graphene-sqlalchemy/issues/134 but I thought I'd post it here too to tap into the SO crowd.

A full working demo can be found under https://github.com/somada141/demo-graphql-sqlalchemy-falcon.

Consider the following SQLAlchemy ORM class:

class Author(Base, OrmBaseMixin):
    __tablename__ = "authors"

    author_id = sqlalchemy.Column(
        sqlalchemy.types.Integer(),
        primary_key=True,
    )

    name_first = sqlalchemy.Column(
        sqlalchemy.types.Unicode(length=80),
        nullable=False,
    )

    name_last = sqlalchemy.Column(
        sqlalchemy.types.Unicode(length=80),
        nullable=False,
    )

Simply wrapped in an SQLAlchemyObjectType as such:

class TypeAuthor(SQLAlchemyObjectType):
    class Meta:
        model = Author

and exposed through:

author = graphene.Field(
    TypeAuthor,
    author_id=graphene.Argument(type=graphene.Int, required=False),
    name_first=graphene.Argument(type=graphene.String, required=False),
    name_last=graphene.Argument(type=graphene.String, required=False),
)

@staticmethod
def resolve_author(
    args,
    info,
    author_id: Union[int, None] = None,
    name_first: Union[str, None] = None,
    name_last: Union[str, None] = None,
):
    query = TypeAuthor.get_query(info=info)

    if author_id:
        query = query.filter(Author.author_id == author_id)

    if name_first:
        query = query.filter(Author.name_first == name_first)

    if name_last:
        query = query.filter(Author.name_last == name_last)

    author = query.first()

    return author

A GraphQL query such as:

query GetAuthor{
  author(authorId: 1) {
    nameFirst
  }
}

will cause the following raw SQL to be emitted (taken from the echo logs of the SQLA engine):

SELECT authors.author_id AS authors_author_id, authors.name_first AS authors_name_first, authors.name_last AS authors_name_last
FROM authors
WHERE authors.author_id = ?
 LIMIT ? OFFSET ?
2018-05-24 16:23:03,669 INFO sqlalchemy.engine.base.Engine (1, 1, 0)

As one can see we may only want the nameFirst field, i.e., the name_first column but the entire row is fetched. Of course the GraphQL response only contains the requested fields, i.e.,

{
  "data": {
    "author": {
      "nameFirst": "Robert"
    }
  }
}

but we have still fetched the entire row, which becomes a major issue when dealing with wide tables.

Is there a way to automagically communicate which columns are needed to SQLAlchemy so as preclude this form of over-fetching?

Brasilin answered 24/5, 2018 at 6:51 Comment(0)
B
5

My question was answered on the GitHub issue (https://github.com/graphql-python/graphene-sqlalchemy/issues/134).

The idea is to identify the requested fields out of the info argument (of type graphql.execution.base.ResolveInfo) that gets passed to the resolver function through a get_field_names function such as the one below:

def get_field_names(info):
    """
    Parses a query info into a list of composite field names.
    For example the following query:
        {
          carts {
            edges {
              node {
                id
                name
                ...cartInfo
              }
            }
          }
        }
        fragment cartInfo on CartType { whatever }

    Will result in an array:
        [
            'carts',
            'carts.edges',
            'carts.edges.node',
            'carts.edges.node.id',
            'carts.edges.node.name',
            'carts.edges.node.whatever'
        ]
    """

    fragments = info.fragments

    def iterate_field_names(prefix, field):
        name = field.name.value

        if isinstance(field, FragmentSpread):
            _results = []
            new_prefix = prefix
            sub_selection = fragments[field.name.value].selection_set.selections
        else:
            _results = [prefix + name]
            new_prefix = prefix + name + "."
            if field.selection_set:
                sub_selection = field.selection_set.selections
            else:
                sub_selection = []

        for sub_field in sub_selection:
            _results += iterate_field_names(new_prefix, sub_field)

        return _results

    results = iterate_field_names('', info.field_asts[0])

    return results

The above function was taken from https://github.com/graphql-python/graphene/issues/348#issuecomment-267717809. That issue contains other versions of this function but I felt this was the most complete.

and using the identified fields to limit the retrieved fields in the SQLAlchemy query as such:

fields = get_field_names(info=info)
query = TypeAuthor.get_query(info=info).options(load_only(*relation_fields))

When applied to the above example query:

query GetAuthor{
  author(authorId: 1) {
    nameFirst
  }
}

The get_field_names function would return ['author', 'author.nameFirst']. However, as the 'original' SQLAlchemy ORM fields are snake-cased the get_field_names query needs to be updated to remove the author prefix and convert the fieldnames via the graphene.utils.str_converters.to_snake_case function.

Long story short, the above approach yields a raw SQL query like this:

INFO:sqlalchemy.engine.base.Engine:SELECT authors.author_id AS authors_author_id, authors.name_first AS authors_name_first
FROM authors
WHERE authors.author_id = ?
 LIMIT ? OFFSET ?
2018-06-09 13:22:16,396 INFO sqlalchemy.engine.base.Engine (1, 1, 0)

Update

Should anyone land here wondering about implementation I got around to implementing my own version of the get_query_fields function as such:

from typing import List, Dict, Union, Type

import graphql
from graphql.language.ast import FragmentSpread
from graphql.language.ast import Field
from graphene.utils.str_converters import to_snake_case
import sqlalchemy.orm

from demo.orm_base import OrmBaseMixin

def extract_requested_fields(
    info: graphql.execution.base.ResolveInfo,
    fields: List[Union[Field, FragmentSpread]],
    do_convert_to_snake_case: bool = True,
) -> Dict:
    """Extracts the fields requested in a GraphQL query by processing the AST
    and returns a nested dictionary representing the requested fields.

    Note:
        This function should support arbitrarily nested field structures
        including fragments.

    Example:
        Consider the following query passed to a resolver and running this
        function with the `ResolveInfo` object passed to the resolver.

        >>> query = "query getAuthor{author(authorId: 1){nameFirst, nameLast}}"
        >>> extract_requested_fields(info, info.field_asts, True)
        {'author': {'name_first': None, 'name_last': None}}

    Args:
        info (graphql.execution.base.ResolveInfo): The GraphQL query info passed
            to the resolver function.
        fields (List[Union[Field, FragmentSpread]]): The list of `Field` or
            `FragmentSpread` objects parsed out of the GraphQL query and stored
            in the AST.
        do_convert_to_snake_case (bool): Whether to convert the fields as they
            appear in the GraphQL query (typically in camel-case) back to
            snake-case (which is how they typically appear in ORM classes).

    Returns:
        Dict: The nested dictionary containing all the requested fields.
    """

    result = {}
    for field in fields:

        # Set the `key` as the field name.
        key = field.name.value

        # Convert the key from camel-case to snake-case (if required).
        if do_convert_to_snake_case:
            key = to_snake_case(name=key)

        # Initialize `val` to `None`. Fields without nested-fields under them
        # will have a dictionary value of `None`.
        val = None

        # If the field is of type `Field` then extract the nested fields under
        # the `selection_set` (if defined). These nested fields will be
        # extracted recursively and placed in a dictionary under the field
        # name in the `result` dictionary.
        if isinstance(field, Field):
            if (
                hasattr(field, "selection_set") and
                field.selection_set is not None
            ):
                # Extract field names out of the field selections.
                val = extract_requested_fields(
                    info=info,
                    fields=field.selection_set.selections,
                )
            result[key] = val
        # If the field is of type `FragmentSpread` then retrieve the fragment
        # from `info.fragments` and recursively extract the nested fields but
        # as we don't want the name of the fragment appearing in the result
        # dictionary (since it does not match anything in the ORM classes) the
        # result will simply be result of the extraction.
        elif isinstance(field, FragmentSpread):
            # Retrieve referened fragment.
            fragment = info.fragments[field.name.value]
            # Extract field names out of the fragment selections.
            val = extract_requested_fields(
                info=info,
                fields=fragment.selection_set.selections,
            )
            result = val

    return result

which parses the AST into a dict preserving the structure of the query and (hopefully) matching the structure of the ORM.

Running the info object of a query like:

query getAuthor{
  author(authorId: 1) {
    nameFirst,
    nameLast
  }
}

produces

{'author': {'name_first': None, 'name_last': None}}

while a more complex query like this:

query getAuthor{
  author(nameFirst: "Brandon") {
    ...authorFields
    books {
      ...bookFields
    }
  }
}

fragment authorFields on TypeAuthor {
  nameFirst,
  nameLast
}

fragment bookFields on TypeBook {
  title,
  year
}

produces:

{'author': {'books': {'title': None, 'year': None},
  'name_first': None,
  'name_last': None}}

Now these dictionaries can be used to define what is a field on the primary-table (Author in this case) as they'll have a value of None such as name_first or a field on a relationship of that primary-table such as field title on the books relationship.

A simplistic approach to auto-applying those fields can take the form of the following function:

def apply_requested_fields(
    info: graphql.execution.base.ResolveInfo,
    query: sqlalchemy.orm.Query,
    orm_class: Type[OrmBaseMixin]
) -> sqlalchemy.orm.Query:
    """Updates the SQLAlchemy Query object by limiting the loaded fields of the
    table and its relationship to the ones explicitly requested in the GraphQL
    query.

    Note:
        This function is fairly simplistic in that it assumes that (1) the
        SQLAlchemy query only selects a single ORM class/table and that (2)
        relationship fields are only one level deep, i.e., that requestd fields
        are either table fields or fields of the table relationship, e.g., it
        does not support fields of relationship relationships.

    Args:
        info (graphql.execution.base.ResolveInfo): The GraphQL query info passed
            to the resolver function.
        query (sqlalchemy.orm.Query): The SQLAlchemy Query object to be updated.
        orm_class (Type[OrmBaseMixin]): The ORM class of the selected table.

    Returns:
        sqlalchemy.orm.Query: The updated SQLAlchemy Query object.
    """

    # Extract the fields requested in the GraphQL query.
    fields = extract_requested_fields(
        info=info,
        fields=info.field_asts,
        do_convert_to_snake_case=True,
    )

    # We assume that the top level of the `fields` dictionary only contains a
    # single key referring to the GraphQL resource being resolved.
    tl_key = list(fields.keys())[0]
    # We assume that any keys that have a value of `None` (as opposed to
    # dictionaries) are fields of the primary table.
    table_fields = [
        key for key, val in fields[tl_key].items()
        if val is None
    ]

    # We assume that any keys that have a value being a dictionary are
    # relationship attributes on the primary table with the keys in the
    # dictionary being fields on that relationship. Thus we create a list of
    # `[relatioship_name, relationship_fields]` lists to be used in the
    # `joinedload` definitions.
    relationship_fieldsets = [
        [key, val.keys()]
        for key, val in fields[tl_key].items()
        if isinstance(val, dict)
    ]

    # Assemble a list of `joinedload` definitions on the defined relationship
    # attribute name and the requested fields on that relationship.
    options_joinedloads = []
    for relationship_fieldset in relationship_fieldsets:
        relationship = relationship_fieldset[0]
        rel_fields = relationship_fieldset[1]
        options_joinedloads.append(
            sqlalchemy.orm.joinedload(
                getattr(orm_class, relationship)
            ).load_only(*rel_fields)
        )

    # Update the SQLAlchemy query by limiting the loaded fields on the primary
    # table as well as by including the `joinedload` definitions.
    query = query.options(
        sqlalchemy.orm.load_only(*table_fields),
        *options_joinedloads
    )

    return query
Brasilin answered 9/6, 2018 at 3:20 Comment(4)
This is exactly what I was looking for, and thank you for posting it!! My only question is - why isn't this output natively available in the info object, rather than needing such a complex custom solution? That's a genuine question- it seems like something you'd often want to make sure you're fetching the right data (preventing unnecessary / wasteful queries down the line) from the DB for further processing- but I'm new to Graphene/GraphQL so I'm wondering if I'm just missing something about how Graphene/GraphQL are "meant" to work? – Castled
@Castled I think the issue isn't with Graphene/GraphQL. Those solutions don't care about how you fetch the data or how performant the fetching is. The issue is in the Graphene-SQLAlchemy package for which the primary concern is mapping the SQLAlchemy schema to a GraphQL one. Producing such queries while covering all edge-cases would be far too complex (if not impossible) so the actual fetching falls to the developer. My solution above is not a one-size-fits-all but falls flat at many cases given how insanely complex SQLAlchemy is πŸ˜†. – Brasilin
Isn't it a more general question in GraphQL of when you're querying many-to-many in a database, regardless of what ORM you're using, if any? If you have a many-to-many relationship of a and b, and your query is { a { b } }, it seems like you'd want to do a join of a and b (grouping by a) in the resolver of a... otherwise, the resolver of b within a would hit the database for every single row in a. But there's no way for the resolver of a to know it needs to join against b without a solution like what you posted. – Castled
how would you use the apply_requested_fields function? Can you provide an example applying it on a standard query with other clauses like .filter_by(...) or .one() or .all() please? – Matronize

© 2022 - 2024 β€” McMap. All rights reserved.