Preface: yes, there actually is a solution to your issue. See code at bottom for that. But, before I get there, I'll go into some explanation.
I think the root of the problem here is memory access. First off, although I haven't rigorously investigated it, I suspect that there are a moderate number of improvements that could be made to Julia's underlying code in order to improve the way that it handles memory access in parallel processing. Nevertheless, in this case, I suspect that any underlying issues with the base code, if such actually exist, aren't so much at fault. Instead, I think it is useful to think carefully about what exactly is going on in your code and what it means vis-a-vis memory access.
A key thing to keep in mind when working in Julia is that it stores Arrays in column-major order. That is, it stores them as stacks of columns on top of each other. This generalizes to dimensions > 2 as well. See this very helpful segment of the Julia performance tips for more info. The implication of this is that it is fast to access one row after another after another in a single column. But, if you need to be jumping around columns, then you get into trouble. Yes, accessing ram memory might be relatively fast, but accessing cache memory is much, much faster, and so if your code allows for a single column or so to be loaded from ram into cache and then worked on, then you'll do much better than if you need to be doing lots of swapping between ram and cache. Here in your code, you're switching from column to column between your computations like nobody's business. For instance, in your pmap
each process gets a different column of the shared array to work on. Then, each goes down the rows of that column and modifies the values in it. But, since they are trying to work in parallel with one another, and the whole array is too big to fit into your cache, there is lots of swapping between ram and cache that happens and that really slows you down. In theory, perhaps a clever enough under-the-hood memory management system could be devised to address this, but I don't really know - that goes beyond my pay grade. The same thing of course is happening to your accesses to your other objects.
Another thing to keep in mind in general when parallelizing is your ratio of flops (i.e. computer calculations) to read/write operations. Flops tend to parallelize well, you can have different cores, processes, etc. doing their own little computations on their own bits of data that they hold in their tiny caches. But, read/write operations don't parallelize so well. There are some things that can be done to engineer hardware systems to improve on this. But in general, if you have a given computer system with say, two cores, and you add four more cores to it, your ability to perform flops will increase three fold, but your ability to read/write data to/from ram won't really improve so much. (note: this is an oversimplication, a lot depends on your system). Nevertheless, in general, the higher your ratio of flops to read/writes, the more benefits you can expect from parallelism. In your case, your code involves a decent number of read/writes (all of those accesses to your different arrays) for a relatively small number of flops (a few multiplactions and an addition). It's just something to keep in mind.
Fortunately, your case is amenable to some good speedups from parallelism if written correctly. In my experience with Julia, all of my most successful parallelism comes when I can break data up and have workers process chunks of it separately. Your case happens to be amenable to that. Below is an example of some code I wrote that does that. You can see that it gets nearly a 3x increase in speed going from one processor to three. The code a bit crude in places, but it at least demonstrates the general idea of how something like this could be approached. I give a few comments on the code afterwards.
addprocs(3)
nx = 250;
nz = 250;
nh = 50;
nt = 250;
@everywhere ncol = 100;
model = rand(nz,nx,nh);
data = SharedArray(Float64,nt,ncol);
A = rand(nz,nx,nh);
B = rand(nz,nx,nh);
function distribute_data(X, obj_name_on_worker::Symbol, dim)
size_per_worker = floor(Int,size(X,1) / nworkers())
StartIdx = 1
EndIdx = size_per_worker
for (idx, pid) in enumerate(workers())
if idx == nworkers()
EndIdx = size(X,1)
end
println(StartIdx:EndIdx)
if dim == 3
@spawnat(pid, eval(Main, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:,:])))
elseif dim == 2
@spawnat(pid, eval(Main, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:])))
end
StartIdx = EndIdx + 1
EndIdx = EndIdx + size_per_worker - 1
end
end
distribute_data(model, :model, 3)
distribute_data(A, :A, 3)
distribute_data(B, :B, 3)
distribute_data(data, :data, 2)
@everywhere function Dummy(icol,model,data,A,B)
nx = size(model, 2)
nz = size(A,1)
nh = size(model, 3)
for ih = 1:nh
for ix = 1:nx
for iz = 1:nz
data[iz,icol] += A[iz,ix,ih]*B[iz,ix,ih]*model[iz,ix,ih]
end
end
end
end
regular_test() = map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
function parallel_test()
@everywhere begin
if myid() != 1
map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
end
end
end
@time regular_test(); # 2.120631 seconds (307 allocations: 11.313 KB)
@time parallel_test(); # 0.918850 seconds (5.70 k allocations: 337.250 KB)
getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))
function recombine_data(Data::Symbol)
Results = cell(nworkers())
for (idx, pid) in enumerate(workers())
Results[idx] = getfrom(pid, Data)
end
return vcat(Results...)
end
@time P_Data = recombine_data(:data); # 0.003132 seconds
P_Data == data ## true
Comments
The use of the SharedArray
is quite superfluous here. I just used it since it lends itself easily to being modified in place, which is how your code is originally written. This let me work more directly based on what you wrote without modifying it as much.
I didn't include the step to bring the data back in the time trial, but as you can see, it's quite a trivial amount of time in this case. In other situations, it might be less trivial, but data movement is just one of those issues that you face with parallelism.
When doing time trials in general, it is considered best practice to run the function once (in order to compile the code) and then run it again to get the times. That's what I did here.
See this SO post for where I got inspiration for some of these functions that I used here.
obj_name_on_worker::Symbol
. Is it necessary to use a symbol for this purpose, or could the same thing be achieved by using an array on each of the workers? Second, I am confused as to whydistribute_data
needs to be run for all of the arrays (rather than just the data). Intuitively (and probably naively) I would think that each worker would need access to the full model, A, and B. – Doerr