How to declare a shared DataFrame in Julia for parallel computing
Asked Answered
F

2

6

I have a large simulation on a DataFrame df which I am trying to parallelize and save the results of the simulations in a DataFrame called simulation_results.

The parallelization loop is working just fine. The problem is that if I were to store the results in an array I would declare it as a SharedArray before the loop. I don't know how to declare simulation_results as a "shared DataFrame" which is available everywhere to all processors and can be modified.

A code snippet is as follows:

addprocs(length(Sys.cpu_info()))

@everywhere begin
  using <required packages>

  df = CSV.read("/path/data.csv", DataFrame)

  simulation_results = similar(df, 0) #I need to declare this as shared and modifiable by all processors 
  
  nsims = 100000

end


@sync @distributed for sim in 1:nsims
    nsim_result = similar(df, 0)
    <the code which for one simulation stores the results in nsim_result >
    append!(simulation_results, nsim_result)
end

The problem is that since simulation_results is not declared to be shared and modifiable by processors, after the loop runs, it produces basically an empty DataFrame as was coded in @everywhere simulation_results = similar(df, 0).

Would really appreciate any help on this! Thanks!

Fideliafidelio answered 5/7, 2022 at 22:32 Comment(0)
R
3

The pattern for distributed computing in Julia is much simpler than what you are trying to do.

Your code should look more or less like this:

df = CSV.read("/path/data.csv", DataFrame)

@everywhere using <required packages>


simulation_results = @distributed (append!) for sim in 1:nsims
    <the code which for one simulation stores the results in nsim_result >
    nsim_result
end

Note you do not need to load df at every process within the Julia cluster since @distributed will make sure it is readable. You do not need to @sync neither because in my code you would use the aggregator function (append!).

A minimal working example (run with addprocs(4)):

@everywhere using Distributed, DataFrames
df = DataFrame(a=1:5,b=rand())

and now the result:

julia> @distributed (append!) for i in 2:5
           DataFrame(bsum=sum(df.b[1:myid()]),c=myid())
       end
4×2 DataFrame
 Row │ bsum      c
     │ Float64   Int64
─────┼─────────────────
   1 │ 0.518127      2
   2 │ 0.777191      3
   3 │ 1.03625       4
   4 │ 1.29532       5
Ratify answered 6/7, 2022 at 15:32 Comment(1)
Thank you so much! This strategy worked perfectly. It was also faster relative to the solution of converting from DF to matrix and back to DF.Fideliafidelio
A
1

As long as your dataframe df is numeric in the entries you process, you can pass it back and forth as a matrix:

mynames = names(df)
matrix = Matrix(df)

Then convert matrix to to SharedArray and compute. Then back to matrix.

dfprocessed = DataFrame(matrix, mynames)

Note this method may fail if your dataframe's data are not all of uniform type. It would work best if all are integer or all floating point. You might have to first drop non-numeric columns or set those to numeric levels.

Alyce answered 6/7, 2022 at 4:36 Comment(3)
The operations described here will work even if data is not homogeneous. Just if you want to make auto-detection of column types do identity.(DataFrame(matrix, mynames)).Haematic
Thanks! This is a very useful direction. My df has Floats, Ints and Strings. The auto-detection technique is quite useful in this regard. However I think I am making a mistake in declaring the matrix as shared arrays. I tried matrix = SharedArrays{Any}(Matrix(df)) but got a TypeError on worker 2 onwards : TypeError: in Type{...} expression, expected UnionAll, got a value of type ModuleFideliafidelio
But if matrix = SharedArrays{Float64}(Matrix(df)) I got: ERROR: On worker 2:On worker 2:SystemError: shm_open() failed for /jl0561767X856PoLYbxXOMM1kgei: Permission denied which was preceded by messages for all workers,here 5 is an e.g From worker 5: Requested size(MB) : 13 From worker 5: Please ensure requested size is within system limits. From worker 5: If not, increase system limits and try again. From worker 5: System max size of single shmem segment(MB) : 4 From worker 5: System max size of all shmem segments(MB) : 16Fideliafidelio

© 2022 - 2024 — McMap. All rights reserved.