Skip to content

Commit ca14c64

Browse files
committed
Parallel array ops.
1 parent d735c52 commit ca14c64

File tree

1 file changed

+125
-0
lines changed

1 file changed

+125
-0
lines changed

parallel_algorithm.d

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,102 @@ unittest {
810810
auto bar = iota(666);
811811
assert(parallelAdjacentFind(foo) == [8, 8, 0, 9, 6]);
812812
assert(parallelAdjacentFind(bar).empty);
813+
}
814+
815+
string parallelArrayOp
816+
(string workUnitSize = "1_000", string pool = "taskPool")
817+
(string[] operation...) {
818+
string[] evens, odds;
819+
820+
foreach(i, op; operation) {
821+
if(i & 1) {
822+
odds ~= '"' ~ op ~ '"';
823+
} else {
824+
evens ~= op;
825+
}
826+
}
827+
828+
return "arrayOpImpl!(" ~ std.array.join(odds, ", ") ~
829+
").parallelArrayOp(" ~
830+
std.array.join(evens, ", ") ~ ", " ~ workUnitSize ~ ", " ~
831+
pool ~ ");";
832+
}
833+
834+
template arrayOpImpl(ops...) {
835+
// Code generation function for parallelArrayOp.
836+
private string generateChunkCode(Operands...)() {
837+
static assert(isArray!(Operands[0]),
838+
"Cannot do parallel array ops if the lhs of the expression is " ~
839+
"not an array."
840+
);
841+
842+
import std.string;
843+
844+
string ret = `
845+
immutable len = operands[0].length;
846+
immutable chunkStart = myChunk * workUnitSize;
847+
if(chunkStart >= len) break;
848+
immutable chunkEnd = min(chunkStart + workUnitSize, len);
849+
operands[0][chunkStart..chunkEnd] `;
850+
851+
assert(std.string.indexOf(ops[0], "=") >-1,
852+
"Cannot do a parallel array op where the second argument is not " ~
853+
"some kind of assignment statement. (Got: " ~ ops[0] ~ ")."
854+
);
855+
856+
foreach(i, op; ops) {
857+
ret ~= op;
858+
859+
ret ~= " operands[" ~ to!string(i + 1) ~ "]";
860+
if(isArray!(Operands[i + 1])) {
861+
ret ~= "[chunkStart..chunkEnd] ";
862+
}
863+
}
864+
865+
ret ~= ';';
866+
return ret;
867+
}
868+
869+
void parallelArrayOp(Operands...)(
870+
Operands operands,
871+
size_t workUnitSize,
872+
TaskPool pool
873+
) if(Operands.length >= 2) {
874+
if(pool is null) pool = taskPool;
875+
immutable nThread = pool.size + 1;
876+
size_t workUnitIndex = size_t.max;
877+
878+
foreach(thread; parallel(iota(nThread), 1)) {
879+
while(true) {
880+
immutable myChunk = atomicOp!"+="(workUnitIndex, 1);
881+
enum code = generateChunkCode!(Operands)();
882+
mixin(code);
883+
}
884+
}
885+
}
886+
}
887+
888+
unittest {
889+
{
890+
auto lhs = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11];
891+
auto lhs2 = lhs.dup;
892+
mixin(parallelArrayOp!("2")("lhs[]", "*=", "2"));
893+
lhs2[] *= 2;
894+
assert(equal(lhs, lhs2), text(lhs));
895+
}
896+
897+
{
898+
double[] lhs = new double[7];
899+
double[] o1 = [8, 6, 7, 5, 3, 0, 9];
900+
double[] o2 = [3, 1, 4, 1, 5, 9, 2];
901+
double[] o3 = [2, 7, 1, 8, 2, 8, 1];
902+
903+
mixin(parallelArrayOp!"2"(
904+
"lhs[]", "=", "o1[]", "*", "o2[]", "+", "2", "*", "o3[]"
905+
));
906+
907+
assert(lhs == [28, 20, 30, 21, 19, 16, 20]);
908+
}
813909
}
814910

815911
//////////////////////////////////////////////////////////////////////////////
@@ -1044,7 +1140,36 @@ void findBenchmark() {
10441140
writeln("Parallel adjacent find: ", sw.peek.msecs);
10451141
}
10461142

1143+
void arrayOpBenchmark() {
1144+
enum n = 10_000;
1145+
enum nIter = 5_000;
1146+
1147+
auto lhs = new double[n];
1148+
auto op1 = new double[n];
1149+
auto op2 = new double[n];
1150+
auto op3 = new double[n];
1151+
1152+
foreach(ref elem; chain(lhs, op1, op2, op3)) {
1153+
elem = uniform(0.0, 1.0);
1154+
}
1155+
1156+
auto sw = StopWatch(AutoStart.yes);
1157+
foreach(iter; 0..nIter) {
1158+
lhs[] = op1[] * op2[] / op3[];
1159+
}
1160+
writeln("Serial array multiply: ", sw.peek.msecs);
1161+
1162+
sw.reset();
1163+
foreach(iter; 0..nIter) {
1164+
mixin(parallelArrayOp(
1165+
"lhs[]", "=", "op1[]", "*", "op2[]", "/", "op3[]")
1166+
);
1167+
}
1168+
writeln("Parallel array multiply: ", sw.peek.msecs);
1169+
}
1170+
10471171
void main() {
1172+
arrayOpBenchmark();
10481173
mergeBenchmark();
10491174
mergeInPlaceBenchmark();
10501175
sortBenchmark();

0 commit comments

Comments
 (0)