Annotation of OpenXM/src/kan96xx/plugin/sm1pvm.c, Revision 1.1.1.1
1.1 maekawa 1: #include <stdio.h>
2: #include "pvm3.h"
3: #define SLAVENAME "slave3"
4:
5: #include "datatype.h"
6: #include "stackm.h"
7: #include "extern.h"
8: #include "extern2.h"
9: /* #include "lookup.h" */
10: #include "matrix.h"
11: #include "gradedset.h"
12: #include "sm1pvm.h"
13:
14: static int KpvmVerbose = 0; /* 1 is for outputting debug messages. */
15:
16: /* KSstart(); */
17: int KpvmStartSlaves(char *name,int n);
18: int KpvmStopSlaves(void);
19: int KpvmChangeStateOfSlaves(int k);
20: int KpvmMcast(char *comm);
21: struct object KpvmJobPool(struct object obj);
22:
23: #define MAXHOSTS 32
24: static int PvmStarted = 0;
25: static int Mytid; /* my task id */
26: static int Tids[MAXHOSTS+1]; /* slave ids */
27: static int Nproc;
28: static struct pvmhostinfo *Hostp[MAXHOSTS+1];
29:
30: KpvmStartSlaves(char *name,int nproc) {
31: int numt,i,info;
32: /* enroll in pvm */
33: Nproc = nproc;
34: Mytid = pvm_mytid();
35: if (Nproc > MAXHOSTS) {
36: Nproc = MAXHOSTS-1;
37: fprintf(stderr,"Too many tasks. It is set to %d\n",Nproc);
38: }
39: /* start up slave tasks */
40: numt=pvm_spawn(name, (char**)0, 0, "", Nproc, Tids);
41: if( numt < Nproc ){
42: fprintf(stderr,"Trouble spawning slaves. Aborting. Error codes are:\n");
43: for( i=numt ; i<Nproc ; i++ ) {
44: printf("TID %d %d\n",i,Tids[i]);
45: }
46: for( i=0 ; i<numt ; i++ ){
47: pvm_kill( Tids[i] );
48: }
49: pvm_exit();
50: PvmStarted = 0;
51: return(-1);
52: }
53: PvmStarted = 1;
54: return(0);
55: }
56:
57: int KpvmMcast(char *comm) {
58: if (!PvmStarted) return(-1);
59: pvm_initsend(PvmDataDefault);
60: pvm_pkstr(comm);
61: if (pvm_mcast(Tids, Nproc, 0)<0) {
62: fprintf(stderr,"Error in mcast.\n");
63: pvm_exit(); return(-1);
64: PvmStarted = 0;
65: }
66: return(0);
67: }
68:
69: int KpvmStopSlaves() {
70: int dataId;
71: if (!PvmStarted) return(-1);
72: pvm_initsend(PvmDataDefault);
73: dataId = -1;
74: pvm_pkint(&dataId,1,1);
75: pvm_pkstr("HALT");
76: pvm_mcast(Tids, Nproc, 10);
77: /*
78: for (i=0; i<numt; i++) {
79: pvm_kill(Tids[i]);
80: }
81: */
82:
83: /* Program Finished exit PVM before stopping */
84: PvmStarted = 0;
85: pvm_exit();
86: }
87:
88: int KpvmChangeStateOfSlaves(int k) {
89: int dataId;
90: if (!PvmStarted) return(-1);
91: pvm_initsend(PvmDataDefault);
92: dataId = -1;
93: pvm_pkint(&dataId,1,1);
94: pvm_pkstr("LISTEN");
95: pvm_mcast(Tids, Nproc, 10);
96: /* The next command should be KpvmMcast(); */
97: }
98:
99: struct object KpvmJobPool(struct object obj) {
100: struct object rob;
101: struct object ob1,ob2;
102: int dataId,msgtype,remaining,ansp;
103: int bytes,tag,rtid;
104: int i,m;
105: char **darray;
106: char **aarray;
107: char ans[1024]; /* temporary work area. */
108: struct object op1;
109: int info;
110:
111: rob.tag = Snull;
112: if (!PvmStarted) return(rob);
113:
114: if (obj.tag != Sarray) {
115: fprintf(stderr,"Argument must be an array.\n");
116: return(rob);
117: }
118: m = getoaSize(obj);
119: darray = (char **) GC_malloc(sizeof(char *)*(m+1));
120: aarray = (char **) GC_malloc(sizeof(char *)*(m+1));
121: for (i=0; i<m; i++) {
122: ob1 = getoa(obj,i);
123: if (ob1.tag != Sdollar) {
124: fprintf(stderr,"Argument must be a string.\n");
125: return(rob);
126: }
127: darray[i] = ob1.lc.str;
128: }
129:
130:
131: /* Wait for results or ready signal from slaves */
132: msgtype = 5;
133: remaining = ansp = m;
134: while (ansp >= 0) {
135: if (KpvmVerbose) printf("Waiting for msgtype=5.\n");
136: info = pvm_recv( -1, msgtype );
137: pvm_bufinfo(info,&bytes,&tag,&rtid);
138: pvm_upkint(&dataId,1,1);
139: pvm_upkstr(ans);
140: if (strlen(ans) == 0) { /* slave is ready. */
141: if (KpvmVerbose) printf("Slave %d is ready.",rtid);
142: }else{
143: ansp -- ;
144: aarray[dataId] = (char *) GC_malloc(sizeof(char)*(strlen(ans)+2));
145: strcpy(aarray[dataId],ans);
146: if (KpvmVerbose) printf("[%d] %s from %d.\n",dataId,ans,rtid);
147: if (ansp <= 0) break;
148: }
149:
150: if (remaining > 0) {
151: remaining--;
152: pvm_initsend(PvmDataDefault); /* Always necessary to flush the old data. */
153: pvm_pkint(&remaining,1,1);
154: pvm_pkstr(darray[remaining]);
155: if (KpvmVerbose)
156: printf("Sending the message <<%s>> of the type 10.\n",darray[remaining]);
157: pvm_send(rtid, 10);
158: }
159:
160: }
161:
162: printf("-------------------------------\n");
163: for (i=0; i<m; i++) {
164: if (KpvmVerbose) printf("%s\n",aarray[i]);
165: }
166: printf("------------------------------\n\n");
167:
168: rob = newObjectArray(m);
169: for (i=0; i<m; i++) {
170: op1.tag = Sdollar;
171: op1.lc.str = aarray[i];
172: putoa(rob,i,op1);
173: }
174: return(rob);
175: }
176:
177: #ifdef MSG1PTEST
178: main()
179: {
180: int m,i;
181: struct object obj;
182: struct object op1;
183: struct object rob;
184: m = 5;
185:
186:
187: if (KpvmStartSlaves(SLAVENAME,3)) exit();
188: KpvmMcast("/afo { /n set (x+1). n power [((x-1)^2).] reduction 0 get (string) dc} def ");
189:
190: obj = newObjectArray(m);
191: for (i=0; i<m; i++) {
192: op1.tag = Sdollar;
193: op1.lc.str = (char *)GC_malloc(10);
194: sprintf(op1.lc.str," %d afo ", 100 + i%10);
195: putoa(obj,i,op1);
196: }
197: rob = KpvmJobPool(obj);
198: printObject(rob,0,stdout); printf("\n");
199:
200: KpvmChangeStateOfSlaves(0);
201: KpvmMcast("/afo2 { /n set n 1 add (dollar) dc } def ");
202:
203: obj = newObjectArray(m);
204: for (i=0; i<m; i++) {
205: op1.tag = Sdollar;
206: op1.lc.str = (char *)GC_malloc(10);
207: sprintf(op1.lc.str," %d afo2 ", 100 + i%10);
208: putoa(obj,i,op1);
209: }
210: rob = KpvmJobPool(obj);
211: printObject(rob,0,stdout); printf("\n");
212:
213:
214:
215: KpvmStopSlaves();
216: }
217: #endif
218:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>