[BACK]Return to oh_parallel.rr CVS log [TXT][DIR] Up to [local] / OpenXM / src / asir-contrib / packages / src

File: [local] / OpenXM / src / asir-contrib / packages / src / oh_parallel.rr (download)

Revision 1.5, Mon Nov 24 22:41:05 2008 UTC (15 years, 6 months ago) by ohara
Branch: MAIN
CVS Tags: R_1_3_1-2, RELEASE_1_3_1_13b, RELEASE_1_2_3_12, HEAD
Changes since 1.4: +6 -4 lines

A bug fixed

/* $OpenXM: OpenXM/src/asir-contrib/packages/src/oh_parallel.rr,v 1.5 2008/11/24 22:41:05 ohara Exp $ */
/* A support library for parallel computation */

module oh_parallel$

localf open, close;
localf load_library, load_string;
localf set_env, get_env;
localf load_env, save_env;
localf execute_function;
localf compute;
localf ox_restart;

/*
  Example: open(Node|cd=no,x=yes)
*/
def open(Node) {
    local IdSet, Launch, I;
    IdSet=newvect(Node);
    Launch = (type(getopt(x))>=0)? ox_launch: ox_launch_nox;
    for(I=0; I<Node; I++) {
        IdSet[I] = (*Launch)();
    }
    if (getopt(cd)!=no) {
        map(ox_rpc,IdSet,"chdir",::pwd());
        map(ox_pop_local,IdSet);
    }
    return vtol(IdSet);
}

def close(IdSet) {
    map(ox_shutdown, IdSet);
}

def load_library(IdSet,Lib) {
    map(ox_rpc,IdSet,"load",Lib);
    map(ox_pop_local,IdSet);
}

def load_string(IdSet,String) {
    map(ox_execute_string,IdSet,String);
    map(ox_pop_local,IdSet);
}

def execute_function(IdSet,FuncArgs) {
    Node=length(IdSet);
    for(I=0; I<Node; I++) {
        call(ox_rpc, cons(IdSet[I], FuncArgs));
    }
}

def set_env(IdSet,EnvName,Env) {
    map(ox_execute_string,IdSet,"extern "+EnvName+";");
    map(ox_push_local,IdSet,Env);
    map(ox_setname,IdSet,EnvName);
}

def get_env(IdSet,EnvName) {
    map(ox_execute_string,IdSet,EnvName+";");
    return map(ox_pop_local,IdSet);
}

/* load_env([1,2,3],'Env',"A"); */
def load_env(IdSet,Env,ValName) {
    local File;
    File = sprintf("%a.sav", Env);
    map(ox_rpc,IdSet,"bload",File);
    map(ox_setname,IdSet,ValName);
    map(ox_pop_local,IdSet);
}

/* load_env('Env',A); */
def save_env(Env,Data) {
    local File;
    File = sprintf("%a.sav", Env);
    bsave(Data,File);
}

/* We assume that the identity number of an openxm server is preserved! */
def ox_restart(Id) {
    ox_shutdown(Id);
    return open(1|option_list=getopt())[0]; 
}

/* 
oh.parallel.compute(IdSet,TagSet,"foo",[Arg1,Arg2,...]
  |debug=yes,  (for printing messages)
   restart=yes,initf=["bar",a,b] (servers will restart and be initilized by bar(a,b))
   );
*/
def compute(IdSet,TagSet,Func,Args) {
    local Ret, Node, Unused, Running, Id, Recv, Tag, FuncArgs, Pos, Debug;
    local Restart, InitFuncArgs, Opts;
    local DivFunc, MergeFunc, A;
    Opts=getopt();
    Restart = (getopt(restart)==yes)? 1: 0;
    InitFuncArgs = getopt(initf);
    Debug = (getopt(debug)==yes)? 1: 0; /* for debug */
    DivFunc = getopt(div);
    MergeFunc  = getopt(merge);
    Ret = newvect(length(TagSet));
    Node = length(IdSet); /* Node == #(servers) */
    Running=[];     /* the list of identity numbers of running servers */
    Unused = TagSet;
    while(Unused!=[] || Running!=[]) {
        if (Unused!=[]) {
            Tag=car(Unused); Unused=cdr(Unused);
            Id = base_set_minus(IdSet,Running)[0]; /* Unused server */
            if (type(DivFunc)==2) {
                A = (*DivFunc)(Tag,TagSet,Args);
            }else {
                A = Args;
            }
            FuncArgs = append([Id,Func,Tag],A);
            if(Debug) {
                printf("oh_parallel.compute(%a): ox_rpc<%a,%a>\n",Func,Id,Tag);
            }
            call(ox_rpc, FuncArgs);
            ox_push_cmd(Id,258); /* SM_popSerializedLocalObject */
            Running=cons(Id,Running);
        }
        if (length(Running)>=Node || Unused==[]) {
            Id=ox_select(Running)[0];
            Recv=ox_get(Id); /* Recv == [Tag, ReturnValue] */
            if(Debug) {
                printf("oh_parallel.compute(%a): Recv<%a>=%a\n",Func,Id,Recv);
            }
            Pos = base_position(Recv[0],TagSet);
            if(Debug) {
                printf("oh_parallel.compute(%a): Pos=%a,Tag=%a,TagSet=%a\n",Func,Pos,Recv[0],TagSet);
            }
            Ret[Pos]=Recv[1];
            Running=base_prune(Id,Running);
            if(Restart) { /* Restarting server */
                ox_restart(Id|option_list=Opts);
                if(type(InitFuncArgs)==4) {
                    execute_function([Id],InitFuncArgs);
                }
            }
        }
    }
    if(type(MergeFunc)==2) {
        Ret=(*MergeFunc)(Ret);
    }
    return vtol(Ret);
}

endmodule$

end$