Managing connection with non-buffered queries in Dapper
Asked Answered
P

2

11

I have recently started using Dapper, everything seems nice and easy but there is one thing that keeps confusing me: Connection Management.

As per the documentation:

Dapper does not manage your connection's lifecycle, it assumes the connection it gets is open AND has no existing datareaders enumerating (unless MARS is enabled)

In light of this I started doing this inside the implementation of my repository methods:

using (var db = new SqliteConnection(connectionString)) {
    // call Dapper methods here
}

Then I came across a table with a large number of records, so I though of returning an IEnumerable<T> by passing buffered: false to the Query<> method, and when I started enumerating the enumerable in the front end, boom an exception saying the connection was closed and disposed which is expected since I am wrapping my calls with the preceding using block.

Question: Best way to solve this ?
Side question: Is the way I am managing the connection the preferred way to go about it ?

Prudish answered 11/4, 2014 at 22:20 Comment(1)
Just in case this helps someone else, I used buffered: false and it corrected my connection issue w/ dapper.Cotsen
D
13

I'd offer this repository pattern:

public class Repository
{
    private readonly string _connectionString;

    public Repository(string connectionString)
    {
        _connectionString = connectionString;
    }

    protected T GetConnection<T>(Func<IDbConnection, T> getData)
    {
        using (var connection = new SqlConnection(_connectionString))
        {
            connection.Open();
            return getData(connection);
        }
    }

    protected TResult GetConnection<TRead, TResult>(Func<IDbConnection, TRead> getData, Func<TRead, TResult> process)
    {
        using (var connection = new SqlConnection(_connectionString))
        {
            connection.Open();
            var data = getData(connection);
            return process(data);
        }
    }
}

For buffered queries you want to use first overload of GetConnection method, for non-buffered you use second, specifing callback for processing data:

public class MyRepository : Repository
{
    public MyRepository(string connectionString) : base(connectionString)
    {
    }

    public IEnumerable<MyMapObject> GetData()
    {
        return GetConnection(c => c.Query<MyMapObject>(query));
    }

    public IEnumerable<ResultObject> GetLotsOfData(Func<IEnumerable<MyMapObject>, IEnumerable<ResultObject>> process)
    {
        return GetConnection(c => c.Query<MyMapObject>(query, buffered: false), process);
    }
}

Very basic usage:

static void Main(string[] args)
{
    var repository = new MyRepository(connectionString);
    var data = repository.GetLotsOfData(ProcessData);
}

public static IEnumerable<ResultObject> ProcessData(IEnumerable<MyMapObject> data)
{
    foreach (var record in data)
    {
        var result = new ResultObject();
        //do some work...
        yield return result;
    }
}

But keep in mind - connection may be opened for too long time in this case...

Deprived answered 14/4, 2014 at 14:4 Comment(2)
Ohh man, what was I thinking. Thank you Sergio for reminding me of that yield keyword. I don't know why I forgot about its usage. One thing though, You don't need all of this plumbing to get this working, you can just use yield return result inside the method implementation in the repository directly. If no one else came with a better answer I will award you the bounty. Thank you.Prudish
Perhaps, the connection doesn't need to be open in the newer version of Dapper. SqlMapper class checks the state of the connection and if it's closed, it will call Open() on it.Demission
C
7

@Sergio, AWESOME! Thanks for such a great pattern. I modified it slightly to be async so that I can use it with Dapper's async methods. Makes my entire request chain async, from the controllers all the way back to the DB! Gorgeous!

public abstract class BaseRepository
{
    private readonly string _ConnectionString;

    protected BaseRepository(string connectionString)
    {
        _ConnectionString = connectionString;
    }

    // use for buffered queries
    protected async Task<T> WithConnection<T>(Func<IDbConnection, Task<T>> getData)
    {
        try
        {
            using (var connection = new SqlConnection(_ConnectionString))
            {
                await connection.OpenAsync();
                return await getData(connection);
            }
        }
        catch (TimeoutException ex)
        {
            throw new Exception(String.Format("{0}.WithConnection() experienced a SQL timeout", GetType().FullName), ex);
        }
        catch (SqlException ex)
        {
            throw new Exception(String.Format("{0}.WithConnection() experienced a SQL exception (not a timeout)", GetType().FullName), ex);
        }
    }

    // use for non-buffeed queries
    protected async Task<TResult> WithConnection<TRead, TResult>(Func<IDbConnection, Task<TRead>> getData, Func<TRead, Task<TResult>> process)
    {
        try
        {
            using (var connection = new SqlConnection(_ConnectionString))
            {
                await connection.OpenAsync();
                var data = await getData(connection);
                return await process(data);
            }
        }
        catch (TimeoutException ex)
        {
            throw new Exception(String.Format("{0}.WithConnection() experienced a SQL timeout", GetType().FullName), ex);
        }
        catch (SqlException ex)
        {
            throw new Exception(String.Format("{0}.WithConnection() experienced a SQL exception (not a timeout)", GetType().FullName), ex);
        }
    }
}

Use with Dapper like this:

public class PersonRepository : BaseRepository
{
    public PersonRepository(string connectionString): base (connectionString) { }

    // Assumes you have a Person table in your DB that 
    // aligns with a Person POCO model.
    //
    // Assumes you have an existing SQL sproc in your DB 
    // with @Id UNIQUEIDENTIFIER as a parameter. The sproc 
    // returns rows from the Person table.
    public async Task<Person> GetPersonById(Guid Id)
    {
        return await WithConnection(async c =>
        {
            var p = new DynamicParameters();
            p.Add("Id", Id, DbType.Guid);
            var people = await c.QueryAsync<Person>(sql: "sp_Person_GetById", param: p, commandType: CommandType.StoredProcedure);
            return people.FirstOrDefault();
        });
    }
}
Cacilia answered 5/2, 2015 at 21:26 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.