[BACK]Return to sm1pvm.c CVS log [TXT][DIR] Up to [local] / OpenXM / src / kan96xx / plugin

Annotation of OpenXM/src/kan96xx/plugin/sm1pvm.c, Revision 1.4

1.4     ! ohara       1: /* $OpenXM: OpenXM/src/kan96xx/plugin/sm1pvm.c,v 1.3 2001/05/04 01:06:30 takayama Exp $ */
1.1       maekawa     2: #include <stdio.h>
                      3: #include "pvm3.h"
                      4: #define SLAVENAME "slave3"
                      5:
                      6: #include "datatype.h"
                      7: #include "stackm.h"
                      8: #include "extern.h"
                      9: #include "extern2.h"
                     10: /* #include "lookup.h" */
                     11: #include "matrix.h"
                     12: #include "gradedset.h"
                     13: #include "sm1pvm.h"
                     14:
                     15: static int KpvmVerbose = 0;   /* 1 is for outputting debug messages.  */
                     16:
                     17: /*    KSstart();    */
                     18: int KpvmStartSlaves(char *name,int n);
                     19: int KpvmStopSlaves(void);
                     20: int KpvmChangeStateOfSlaves(int k);
                     21: int KpvmMcast(char *comm);
                     22: struct object KpvmJobPool(struct object obj);
                     23:
                     24: #define MAXHOSTS 32
                     25: static int PvmStarted = 0;
                     26: static int Mytid;                  /* my task id */
                     27: static int Tids[MAXHOSTS+1];         /* slave ids */
                     28: static int Nproc;
                     29: static  struct pvmhostinfo *Hostp[MAXHOSTS+1];
                     30:
                     31: KpvmStartSlaves(char *name,int nproc) {
                     32:   int numt,i,info;
1.3       takayama   33:   /* enroll in pvm */
                     34:   Nproc = nproc;
                     35:   Mytid = pvm_mytid();
                     36:   if (Nproc > MAXHOSTS) {
                     37:     Nproc = MAXHOSTS-1;
                     38:     fprintf(stderr,"Too many tasks. It is set to %d\n",Nproc);
                     39:   }
                     40:   /* start up slave tasks */
                     41:   numt=pvm_spawn(name, (char**)0, 0, "", Nproc, Tids);
                     42:   if( numt < Nproc ){
                     43:     fprintf(stderr,"Trouble spawning slaves. Aborting. Error codes are:\n");
                     44:     for( i=numt ; i<Nproc ; i++ ) {
                     45:       printf("TID %d %d\n",i,Tids[i]);
1.1       maekawa    46:     }
1.3       takayama   47:     for( i=0 ; i<numt ; i++ ){
                     48:       pvm_kill( Tids[i] );
1.1       maekawa    49:     }
1.3       takayama   50:     pvm_exit();
                     51:     PvmStarted = 0;
                     52:     return(-1);
                     53:   }
                     54:   PvmStarted = 1;
                     55:   return(0);
1.1       maekawa    56: }
                     57:
                     58: int KpvmMcast(char *comm) {
                     59:   if (!PvmStarted) return(-1);
                     60:   pvm_initsend(PvmDataDefault);
                     61:   pvm_pkstr(comm);
                     62:   if (pvm_mcast(Tids, Nproc, 0)<0) {
                     63:     fprintf(stderr,"Error in mcast.\n");
                     64:     pvm_exit(); return(-1);
                     65:     PvmStarted = 0;
                     66:   }
                     67:   return(0);
                     68: }
                     69:
                     70: int KpvmStopSlaves() {
                     71:   int dataId;
                     72:   if (!PvmStarted) return(-1);
                     73:   pvm_initsend(PvmDataDefault);
                     74:   dataId = -1;
                     75:   pvm_pkint(&dataId,1,1);
                     76:   pvm_pkstr("HALT");
                     77:   pvm_mcast(Tids, Nproc, 10);
                     78:   /*
                     79:     for (i=0; i<numt; i++) {
1.3       takayama   80:     pvm_kill(Tids[i]);
1.1       maekawa    81:     }
1.3       takayama   82:   */
1.1       maekawa    83:
                     84:   /* Program Finished exit PVM before stopping */
                     85:   PvmStarted = 0;
                     86:   pvm_exit();
                     87: }
                     88:
                     89: int KpvmChangeStateOfSlaves(int k) {
                     90:   int dataId;
                     91:   if (!PvmStarted) return(-1);
                     92:   pvm_initsend(PvmDataDefault);
                     93:   dataId = -1;
                     94:   pvm_pkint(&dataId,1,1);
                     95:   pvm_pkstr("LISTEN");
                     96:   pvm_mcast(Tids, Nproc, 10);
                     97:   /* The next command should be KpvmMcast(); */
                     98: }
                     99:
                    100: struct object KpvmJobPool(struct object obj) {
                    101:   struct object rob;
                    102:   struct object ob1,ob2;
                    103:   int dataId,msgtype,remaining,ansp;
                    104:   int bytes,tag,rtid;
                    105:   int i,m;
                    106:   char **darray;
                    107:   char **aarray;
                    108:   char ans[1024];  /* temporary work area. */
                    109:   struct object op1;
                    110:   int info;
                    111:
                    112:   rob.tag = Snull;
                    113:   if (!PvmStarted) return(rob);
                    114:
                    115:   if (obj.tag != Sarray) {
                    116:     fprintf(stderr,"Argument must be an array.\n");
                    117:     return(rob);
                    118:   }
                    119:   m = getoaSize(obj);
                    120:   darray = (char **) GC_malloc(sizeof(char *)*(m+1));
                    121:   aarray = (char **) GC_malloc(sizeof(char *)*(m+1));
                    122:   for (i=0; i<m; i++) {
                    123:     ob1 = getoa(obj,i);
                    124:     if (ob1.tag != Sdollar) {
                    125:       fprintf(stderr,"Argument must be a string.\n");
                    126:       return(rob);
                    127:     }
                    128:     darray[i] = ob1.lc.str;
                    129:   }
                    130:
                    131:
                    132:   /* Wait for results or ready signal from slaves */
                    133:   msgtype = 5;
                    134:   remaining = ansp = m;
                    135:   while (ansp >= 0) {
                    136:     if (KpvmVerbose) printf("Waiting for msgtype=5.\n");
                    137:     info = pvm_recv( -1, msgtype );
                    138:     pvm_bufinfo(info,&bytes,&tag,&rtid);
                    139:     pvm_upkint(&dataId,1,1);
                    140:     pvm_upkstr(ans);
                    141:     if (strlen(ans) == 0) { /* slave is ready. */
                    142:       if (KpvmVerbose) printf("Slave %d is ready.",rtid);
                    143:     }else{
                    144:       ansp -- ;
                    145:       aarray[dataId] = (char *) GC_malloc(sizeof(char)*(strlen(ans)+2));
                    146:       strcpy(aarray[dataId],ans);
                    147:       if (KpvmVerbose) printf("[%d] %s from %d.\n",dataId,ans,rtid);
                    148:       if (ansp <= 0) break;
                    149:     }
                    150:
                    151:     if (remaining > 0) {
                    152:       remaining--;
                    153:       pvm_initsend(PvmDataDefault); /* Always necessary to flush the old data. */
                    154:       pvm_pkint(&remaining,1,1);
                    155:       pvm_pkstr(darray[remaining]);
                    156:       if (KpvmVerbose)
1.3       takayama  157:         printf("Sending the message <<%s>> of the type 10.\n",darray[remaining]);
1.1       maekawa   158:       pvm_send(rtid, 10);
                    159:     }
                    160:
                    161:   }
                    162:
                    163:   printf("-------------------------------\n");
                    164:   for (i=0; i<m; i++) {
                    165:     if (KpvmVerbose) printf("%s\n",aarray[i]);
                    166:   }
                    167:   printf("------------------------------\n\n");
                    168:
                    169:   rob = newObjectArray(m);
                    170:   for (i=0; i<m; i++) {
                    171:     op1.tag = Sdollar;
                    172:     op1.lc.str = aarray[i];
                    173:     putoa(rob,i,op1);
                    174:   }
                    175:   return(rob);
                    176: }
                    177:
                    178: #ifdef MSG1PTEST
                    179: main()
                    180: {
                    181:   int m,i;
                    182:   struct object obj;
                    183:   struct object op1;
                    184:   struct object rob;
                    185:   m = 5;
                    186:
                    187:
1.4     ! ohara     188:   if (KpvmStartSlaves(SLAVENAME,3)) exit(0);
1.1       maekawa   189:   KpvmMcast("/afo { /n  set (x+1). n power [((x-1)^2).] reduction 0 get (string) dc} def ");
                    190:
                    191:   obj = newObjectArray(m);
                    192:   for (i=0; i<m; i++) {
                    193:     op1.tag = Sdollar;
                    194:     op1.lc.str = (char *)GC_malloc(10);
                    195:     sprintf(op1.lc.str," %d afo ", 100 + i%10);
                    196:     putoa(obj,i,op1);
                    197:   }
                    198:   rob = KpvmJobPool(obj);
                    199:   printObject(rob,0,stdout); printf("\n");
                    200:
                    201:   KpvmChangeStateOfSlaves(0);
                    202:   KpvmMcast("/afo2 { /n  set n 1 add (dollar) dc } def ");
                    203:
                    204:   obj = newObjectArray(m);
                    205:   for (i=0; i<m; i++) {
                    206:     op1.tag = Sdollar;
                    207:     op1.lc.str = (char *)GC_malloc(10);
                    208:     sprintf(op1.lc.str," %d afo2 ", 100 + i%10);
                    209:     putoa(obj,i,op1);
                    210:   }
                    211:   rob = KpvmJobPool(obj);
                    212:   printObject(rob,0,stdout); printf("\n");
                    213:
                    214:
                    215:
                    216:   KpvmStopSlaves();
                    217: }
                    218: #endif
                    219:

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>