Annotation of OpenXM/src/kan96xx/plugin/sm1pvm.c, Revision 1.3
1.3 ! takayama 1: /* $OpenXM: OpenXM/src/kan96xx/plugin/sm1pvm.c,v 1.2 2000/01/16 07:55:48 takayama Exp $ */
1.1 maekawa 2: #include <stdio.h>
3: #include "pvm3.h"
4: #define SLAVENAME "slave3"
5:
6: #include "datatype.h"
7: #include "stackm.h"
8: #include "extern.h"
9: #include "extern2.h"
10: /* #include "lookup.h" */
11: #include "matrix.h"
12: #include "gradedset.h"
13: #include "sm1pvm.h"
14:
15: static int KpvmVerbose = 0; /* 1 is for outputting debug messages. */
16:
17: /* KSstart(); */
18: int KpvmStartSlaves(char *name,int n);
19: int KpvmStopSlaves(void);
20: int KpvmChangeStateOfSlaves(int k);
21: int KpvmMcast(char *comm);
22: struct object KpvmJobPool(struct object obj);
23:
24: #define MAXHOSTS 32
25: static int PvmStarted = 0;
26: static int Mytid; /* my task id */
27: static int Tids[MAXHOSTS+1]; /* slave ids */
28: static int Nproc;
29: static struct pvmhostinfo *Hostp[MAXHOSTS+1];
30:
31: KpvmStartSlaves(char *name,int nproc) {
32: int numt,i,info;
1.3 ! takayama 33: /* enroll in pvm */
! 34: Nproc = nproc;
! 35: Mytid = pvm_mytid();
! 36: if (Nproc > MAXHOSTS) {
! 37: Nproc = MAXHOSTS-1;
! 38: fprintf(stderr,"Too many tasks. It is set to %d\n",Nproc);
! 39: }
! 40: /* start up slave tasks */
! 41: numt=pvm_spawn(name, (char**)0, 0, "", Nproc, Tids);
! 42: if( numt < Nproc ){
! 43: fprintf(stderr,"Trouble spawning slaves. Aborting. Error codes are:\n");
! 44: for( i=numt ; i<Nproc ; i++ ) {
! 45: printf("TID %d %d\n",i,Tids[i]);
1.1 maekawa 46: }
1.3 ! takayama 47: for( i=0 ; i<numt ; i++ ){
! 48: pvm_kill( Tids[i] );
1.1 maekawa 49: }
1.3 ! takayama 50: pvm_exit();
! 51: PvmStarted = 0;
! 52: return(-1);
! 53: }
! 54: PvmStarted = 1;
! 55: return(0);
1.1 maekawa 56: }
57:
58: int KpvmMcast(char *comm) {
59: if (!PvmStarted) return(-1);
60: pvm_initsend(PvmDataDefault);
61: pvm_pkstr(comm);
62: if (pvm_mcast(Tids, Nproc, 0)<0) {
63: fprintf(stderr,"Error in mcast.\n");
64: pvm_exit(); return(-1);
65: PvmStarted = 0;
66: }
67: return(0);
68: }
69:
70: int KpvmStopSlaves() {
71: int dataId;
72: if (!PvmStarted) return(-1);
73: pvm_initsend(PvmDataDefault);
74: dataId = -1;
75: pvm_pkint(&dataId,1,1);
76: pvm_pkstr("HALT");
77: pvm_mcast(Tids, Nproc, 10);
78: /*
79: for (i=0; i<numt; i++) {
1.3 ! takayama 80: pvm_kill(Tids[i]);
1.1 maekawa 81: }
1.3 ! takayama 82: */
1.1 maekawa 83:
84: /* Program Finished exit PVM before stopping */
85: PvmStarted = 0;
86: pvm_exit();
87: }
88:
89: int KpvmChangeStateOfSlaves(int k) {
90: int dataId;
91: if (!PvmStarted) return(-1);
92: pvm_initsend(PvmDataDefault);
93: dataId = -1;
94: pvm_pkint(&dataId,1,1);
95: pvm_pkstr("LISTEN");
96: pvm_mcast(Tids, Nproc, 10);
97: /* The next command should be KpvmMcast(); */
98: }
99:
100: struct object KpvmJobPool(struct object obj) {
101: struct object rob;
102: struct object ob1,ob2;
103: int dataId,msgtype,remaining,ansp;
104: int bytes,tag,rtid;
105: int i,m;
106: char **darray;
107: char **aarray;
108: char ans[1024]; /* temporary work area. */
109: struct object op1;
110: int info;
111:
112: rob.tag = Snull;
113: if (!PvmStarted) return(rob);
114:
115: if (obj.tag != Sarray) {
116: fprintf(stderr,"Argument must be an array.\n");
117: return(rob);
118: }
119: m = getoaSize(obj);
120: darray = (char **) GC_malloc(sizeof(char *)*(m+1));
121: aarray = (char **) GC_malloc(sizeof(char *)*(m+1));
122: for (i=0; i<m; i++) {
123: ob1 = getoa(obj,i);
124: if (ob1.tag != Sdollar) {
125: fprintf(stderr,"Argument must be a string.\n");
126: return(rob);
127: }
128: darray[i] = ob1.lc.str;
129: }
130:
131:
132: /* Wait for results or ready signal from slaves */
133: msgtype = 5;
134: remaining = ansp = m;
135: while (ansp >= 0) {
136: if (KpvmVerbose) printf("Waiting for msgtype=5.\n");
137: info = pvm_recv( -1, msgtype );
138: pvm_bufinfo(info,&bytes,&tag,&rtid);
139: pvm_upkint(&dataId,1,1);
140: pvm_upkstr(ans);
141: if (strlen(ans) == 0) { /* slave is ready. */
142: if (KpvmVerbose) printf("Slave %d is ready.",rtid);
143: }else{
144: ansp -- ;
145: aarray[dataId] = (char *) GC_malloc(sizeof(char)*(strlen(ans)+2));
146: strcpy(aarray[dataId],ans);
147: if (KpvmVerbose) printf("[%d] %s from %d.\n",dataId,ans,rtid);
148: if (ansp <= 0) break;
149: }
150:
151: if (remaining > 0) {
152: remaining--;
153: pvm_initsend(PvmDataDefault); /* Always necessary to flush the old data. */
154: pvm_pkint(&remaining,1,1);
155: pvm_pkstr(darray[remaining]);
156: if (KpvmVerbose)
1.3 ! takayama 157: printf("Sending the message <<%s>> of the type 10.\n",darray[remaining]);
1.1 maekawa 158: pvm_send(rtid, 10);
159: }
160:
161: }
162:
163: printf("-------------------------------\n");
164: for (i=0; i<m; i++) {
165: if (KpvmVerbose) printf("%s\n",aarray[i]);
166: }
167: printf("------------------------------\n\n");
168:
169: rob = newObjectArray(m);
170: for (i=0; i<m; i++) {
171: op1.tag = Sdollar;
172: op1.lc.str = aarray[i];
173: putoa(rob,i,op1);
174: }
175: return(rob);
176: }
177:
178: #ifdef MSG1PTEST
179: main()
180: {
181: int m,i;
182: struct object obj;
183: struct object op1;
184: struct object rob;
185: m = 5;
186:
187:
188: if (KpvmStartSlaves(SLAVENAME,3)) exit();
189: KpvmMcast("/afo { /n set (x+1). n power [((x-1)^2).] reduction 0 get (string) dc} def ");
190:
191: obj = newObjectArray(m);
192: for (i=0; i<m; i++) {
193: op1.tag = Sdollar;
194: op1.lc.str = (char *)GC_malloc(10);
195: sprintf(op1.lc.str," %d afo ", 100 + i%10);
196: putoa(obj,i,op1);
197: }
198: rob = KpvmJobPool(obj);
199: printObject(rob,0,stdout); printf("\n");
200:
201: KpvmChangeStateOfSlaves(0);
202: KpvmMcast("/afo2 { /n set n 1 add (dollar) dc } def ");
203:
204: obj = newObjectArray(m);
205: for (i=0; i<m; i++) {
206: op1.tag = Sdollar;
207: op1.lc.str = (char *)GC_malloc(10);
208: sprintf(op1.lc.str," %d afo2 ", 100 + i%10);
209: putoa(obj,i,op1);
210: }
211: rob = KpvmJobPool(obj);
212: printObject(rob,0,stdout); printf("\n");
213:
214:
215:
216: KpvmStopSlaves();
217: }
218: #endif
219:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>