/* $OpenXM: OpenXM/src/asir-contrib/packages/src/oh_parallel.rr,v 1.5 2008/11/24 22:41:05 ohara Exp $ */
/* A support library for parallel computation */
module oh_parallel$
localf open, close;
localf load_library, load_string;
localf set_env, get_env;
localf load_env, save_env;
localf execute_function;
localf compute;
localf ox_restart;
/*
Example: open(Node|cd=no,x=yes)
*/
def open(Node) {
local IdSet, Launch, I;
IdSet=newvect(Node);
Launch = (type(getopt(x))>=0)? ox_launch: ox_launch_nox;
for(I=0; I<Node; I++) {
IdSet[I] = (*Launch)();
}
if (getopt(cd)!=no) {
map(ox_rpc,IdSet,"chdir",::pwd());
map(ox_pop_local,IdSet);
}
return vtol(IdSet);
}
def close(IdSet) {
map(ox_shutdown, IdSet);
}
def load_library(IdSet,Lib) {
map(ox_rpc,IdSet,"load",Lib);
map(ox_pop_local,IdSet);
}
def load_string(IdSet,String) {
map(ox_execute_string,IdSet,String);
map(ox_pop_local,IdSet);
}
def execute_function(IdSet,FuncArgs) {
Node=length(IdSet);
for(I=0; I<Node; I++) {
call(ox_rpc, cons(IdSet[I], FuncArgs));
}
}
def set_env(IdSet,EnvName,Env) {
map(ox_execute_string,IdSet,"extern "+EnvName+";");
map(ox_push_local,IdSet,Env);
map(ox_setname,IdSet,EnvName);
}
def get_env(IdSet,EnvName) {
map(ox_execute_string,IdSet,EnvName+";");
return map(ox_pop_local,IdSet);
}
/* load_env([1,2,3],'Env',"A"); */
def load_env(IdSet,Env,ValName) {
local File;
File = sprintf("%a.sav", Env);
map(ox_rpc,IdSet,"bload",File);
map(ox_setname,IdSet,ValName);
map(ox_pop_local,IdSet);
}
/* load_env('Env',A); */
def save_env(Env,Data) {
local File;
File = sprintf("%a.sav", Env);
bsave(Data,File);
}
/* We assume that the identity number of an openxm server is preserved! */
def ox_restart(Id) {
ox_shutdown(Id);
return open(1|option_list=getopt())[0];
}
/*
oh.parallel.compute(IdSet,TagSet,"foo",[Arg1,Arg2,...]
|debug=yes, (for printing messages)
restart=yes,initf=["bar",a,b] (servers will restart and be initilized by bar(a,b))
);
*/
def compute(IdSet,TagSet,Func,Args) {
local Ret, Node, Unused, Running, Id, Recv, Tag, FuncArgs, Pos, Debug;
local Restart, InitFuncArgs, Opts;
local DivFunc, MergeFunc, A;
Opts=getopt();
Restart = (getopt(restart)==yes)? 1: 0;
InitFuncArgs = getopt(initf);
Debug = (getopt(debug)==yes)? 1: 0; /* for debug */
DivFunc = getopt(div);
MergeFunc = getopt(merge);
Ret = newvect(length(TagSet));
Node = length(IdSet); /* Node == #(servers) */
Running=[]; /* the list of identity numbers of running servers */
Unused = TagSet;
while(Unused!=[] || Running!=[]) {
if (Unused!=[]) {
Tag=car(Unused); Unused=cdr(Unused);
Id = base_set_minus(IdSet,Running)[0]; /* Unused server */
if (type(DivFunc)==2) {
A = (*DivFunc)(Tag,TagSet,Args);
}else {
A = Args;
}
FuncArgs = append([Id,Func,Tag],A);
if(Debug) {
printf("oh_parallel.compute(%a): ox_rpc<%a,%a>\n",Func,Id,Tag);
}
call(ox_rpc, FuncArgs);
ox_push_cmd(Id,258); /* SM_popSerializedLocalObject */
Running=cons(Id,Running);
}
if (length(Running)>=Node || Unused==[]) {
Id=ox_select(Running)[0];
Recv=ox_get(Id); /* Recv == [Tag, ReturnValue] */
if(Debug) {
printf("oh_parallel.compute(%a): Recv<%a>=%a\n",Func,Id,Recv);
}
Pos = base_position(Recv[0],TagSet);
if(Debug) {
printf("oh_parallel.compute(%a): Pos=%a,Tag=%a,TagSet=%a\n",Func,Pos,Recv[0],TagSet);
}
Ret[Pos]=Recv[1];
Running=base_prune(Id,Running);
if(Restart) { /* Restarting server */
ox_restart(Id|option_list=Opts);
if(type(InitFuncArgs)==4) {
execute_function([Id],InitFuncArgs);
}
}
}
}
if(type(MergeFunc)==2) {
Ret=(*MergeFunc)(Ret);
}
return vtol(Ret);
}
endmodule$
end$