ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/JSOC/proj/lev0/apps/build_lev1_mgr_jim.c
Revision: 1.1
Committed: Tue Apr 22 17:59:12 2014 UTC (9 years, 5 months ago) by prodtest
Content type: text/plain
Branch: MAIN
CVS Tags: Ver_8-8, Ver_8-11, Ver_8-10, Ver_8-6, Ver_LATEST, Ver_9-41, Ver_8-5, Ver_8-12, Ver_8-7, Ver_9-5, Ver_9-4, Ver_9-3, Ver_9-2, Ver_9-1, Ver_9-0, HEAD
Log Message:
*** empty log message ***

File Contents

# Content
1 /*-----------------------------------------------------------------------------
2 * cvs/JSOC/proj/lev0/apps/build_lev1_mgr.c
3 *-----------------------------------------------------------------------------
4 *
5 * This is a stand alone module that processes lev0
6 * filtergrams to lev1 by running the build_lev1 module.
7 * (See "Normal Calls:" below for normal running calls)
8 *
9 *build_lev1_mgr
10 * mode= recnum or fsn
11 * instru= hmi or aia
12 * dsin= default hmi.lev0e or aia.lev0e
13 * dsout= hmi.lev1e or aia.lev1e
14 * brec= first lev0 record# to process
15 * erec= last lev0 record# to process
16 * or
17 * bfsn= first lev0 fsn# to process
18 * efsn= last lev0 fsn# to process
19 *
20 * Runs build_lev1 processes to create lev1 datasets
21 * Has a mode to specify lev0 input by record# or fsn. Also if in
22 * mode=recnum and brec=0 and erec=0 then runs in Stream Mode:
23 * Stream Mode (one instance):
24 * This is the normal quick look mode that runs continuously and
25 * keeps up with the lev0 records.
26 * brec=0, erec=0
27 * - start at the previous highest lev0 record processed
28 * This is keep in the DB table lev1_highest_lev0_recnum
29 * - fork from 8 to MAXCPULEV1 build_lev1 for every
30 * 12 (NUMRECLEV1) lev0 records.
31 * - when an build_lev1 completes, fork another for next 12 rec
32 * - if no more lev0 records available, sleep and try again
33 * - if 8 CPU not enough to keep up with lev0, go to 16, etc.
34 *
35 * Recnum/fsn range Mode (any number of instances):
36 * This is lev1 processing by a lev0 range. Does consistency check for mode= and
37 * the use of brec/erec of bfsn/efsn, e.g.:
38 * brec=1000, erec=2000 or bfsn=4836500, efsn=4836590
39 * - qsub up to 16 (MAXQSUBLEV1) build_lev1 for 12 records ea
40 * - when a job completes qsub next 12 records until erec or efsn is reached
41 * - when all jobs are done, build_lev1_mgr will exit
42 *
43 */
44 /* Normal Calls:
45 * For hmi call on cl1n002 as user jsocprod:
46 * build_lev1_mgr mode=recnum instru=hmi dsin=hmi.lev0a dsout=hmi.lev1_nrt
47 * brec=0 erec=0
48 * For aia call on cl1n003 as user jsocprod:
49 * build_lev1_mgr mode=recnum instru=aia dsin=aia.lev0 dsout=aia.lev1_nrt2
50 * brec=0 erec=0
51 * For iris call on cl1n001 as user jsocprod:
52 * build_lev1_mgr mode=recnum instru=iris dsin=iris_ground.lev0_dc1
53 * dsout=iris_ground.lev1_dc1 brec=0 erec=0
54 */
55
56 #include <jsoc.h>
57 #include <cmdparams.h>
58 #include <drms.h>
59 #include <stdio.h>
60 #include <stdlib.h>
61 #include <strings.h>
62 #include <signal.h>
63 #include <sys/types.h>
64 #include <sys/time.h>
65 #include <sys/stat.h> //for umask(2)
66 #include <unistd.h> //for alarm(2) among other things...
67 #include <printk.h>
68 #include <errno.h>
69 #include <sys/wait.h>
70 #include "lev0lev1.h" //defines NUMRECLEV1. Used by this and build_lev1.c
71
72 //default in and out data series
73 #define LEV0SERIESNAMEHMI "hmi.lev0e"
74 #define LEV0SERIESNAMEAIA "aia.lev0e"
75 #define LEV0SERIESNAMEIRIS "iris_ground.lev0_dc1"
76 #define LEV1SERIESNAMEHMI "su_production.hmi_lev1e" //temp test case
77 #define LEV1SERIESNAMEAIA "su_production.aia_lev1e" //temp test case
78 #define LEV1SERIESNAMEIRIS "iris_ground.lev1_dc1"
79 #define DSAIABAD "lm_jps.aia_bad_blobs" //dsaiabad= arg value to build_lev1 for aia. Eliminated by jim on 03Mar2011
80
81 #define LEV1LOG_BASEDIR "/usr/local/logs/lev1"
82 #define H1LOGFILE "/usr/local/logs/lev1/build_lev1_mgr_%s.%s.log"
83 #define QSUBDIR "/surge40/jsocprod/qsub/tmp"
84 #define NUMTIMERS 8 //number of seperate timers avail
85 #define MAXCPULEV1 32 //max# of forks can do at a time for stream mode
86 #define DEFAULTCPULEV1 "8" //default# of forks can do at a time
87 #define MAXQSUBLEV1 64 //max# of qsub can do at a time for reprocessing mode
88 #define DEFAULTQSUBLEV1 "16"
89 #define MAXJIDSTR MAXQSUBLEV1*16
90 #define NOTSPECIFIED "***NOTSPECIFIED***"
91 #define LOGTEST 0
92 char args8sv[128]; //used when LOGTEST = 1
93
94
95 int qsubjob(long long rec1, long long rec2);
96
97 // List of default parameter values.
98 ModuleArgs_t module_args[] = {
99 {ARG_STRING, "instru", "hmi" , "instrument. either hmi,aia or iris"},
100 {ARG_STRING, "dsin", NOTSPECIFIED, "dataset of lev0 filtergrams"},
101 {ARG_STRING, "dsout", NOTSPECIFIED, "dataset of lev1 output"},
102 {ARG_STRING, "logfile", NOTSPECIFIED, "optional log file name. Will create one if not given"},
103 {ARG_INTS, "brec", "-1", "first lev0 rec# to process"},
104 {ARG_INTS, "erec", "-1", "last lev0 rec# to process"},
105 {ARG_INTS, "bfsn", "-1", "first lev0 fsn to process"},
106 {ARG_INTS, "efsn", "-1", "last lev0 fsn to process"},
107 {ARG_INTS, "quicklook", "0", "1=quick look, 0 = definitive mode"},
108 {ARG_INTS, "numrec", NUMRECLEV1S, "number of lev0 to lev1 records at a time"},
109 {ARG_INTS, "numcpu", DEFAULTCPULEV1, "max# of forks to do at a time for stream mode"},
110 {ARG_INTS, "numqsub", DEFAULTQSUBLEV1, "max# of qsub to do at a time for reprocessing mode"},
111 {ARG_FLAG, "v", "0", "verbose flag"},
112 {ARG_FLAG, "h", "0", "help flag"},
113 {ARG_END}
114 };
115
116 ModuleArgs_t *gModArgs = module_args;
117 CmdParams_t cmdparams;
118 // Module name presented to DRMS.
119 char *module_name = "build_lev1_mgr";
120
121 FILE *h1logfp; // fp for h1 ouput log for this run
122 FILE *qsubfp; // fp for qsub script
123 static char datestr[32];
124 static char open_dsname[256];
125 static struct timeval first[NUMTIMERS], second[NUMTIMERS];
126 static struct stat stbuf;
127
128 pid_t mypid;
129 uint64_t jid;
130 int verbose;
131 long long brec, erec, bfsn, efsn; //begin and end lev0 rec#/fsn to do.Must be same data type
132 long long lastrecnum0_now = 0; //last RT lev0 recnum seen
133 long long lastrecnum0_prev = 0; //prvious RT lev0 recnum seen
134 int numrec, numcpu, numqsub;
135 int qcnt = 0;
136 //stream_mode is also quick look mode, i.e. brec=erec=0
137 int stream_mode = 0; //0=qsub build_lev1, 1=fork it locally
138 int quicklook = 0; //can force this flg to be passed to build_lev1
139 //int hmiaiaflg = 0; //0=hmi, 1=aia
140 int instruflg = 0; //0=hmi, 1=aia, 2=iris
141 int modeflg = 0; //0=fsn, 1=recnum
142 int loopcnt = 0; //force last lev0 records to lev1
143 int abortnow = 0;
144 char stopfile[80]; // e.g. /usr/local/logs/lev1/build_mgr_stop_hmi
145 char hmiaianame[16]; // "hmi" or "aia" or "iris"
146 char logname[128];
147 char argdsin[128], argdsout[128], arglogfile[128], arginstru[80], argmode[80];
148 char argbx[80], argex[80], argnumrec[80], argnumcpu[80], argnumqsub[80];
149 char timetag[32];
150 char *username; // from getenv("USER")
151 char *logfile; // optional log name passed in
152 char *instru; // instument. hmi or aia
153 char *mode; // mode. recnum or fsn
154 char *dsin; // lev0 input dataset
155 char *dsout; // lev1 output dataset
156
157
158 int nice_intro ()
159 {
160 int usage = cmdparams_get_int (&cmdparams, "h", NULL);
161 if (usage)
162 {
163 printf ("Runs build_lev1 processes to create lev1 datasets.\n\n");
164 printf ("Usage: build_lev1_mgr [-vh]\n"
165 "mode=<recnum|fsn> instru=<hmi|aia|iris> dsin=<lev0> dsout=<lev1>\n"
166 "brec=<rec#>|bfsn=<fsn#> erec=<rec#>|efsn=<fsn#>\n"
167 "numcpu=<#> numqsub=<#> logfile=<file>\n"
168 " -h: help - show this message then exit\n"
169 " -v: verbose\n"
170 "mode= recnum: brec and erec have the record # range to process \n"
171 " fsn: bfsn and efsn have the fsn # range to process\n"
172 " For safety, the mode and arg name used must be consistent\n"
173 "instru= instrument. must be 'hmi' or 'aia' or 'iris'\n"
174 "dsin= data set name of lev0 input\n"
175 " default hmi=hmi.lev0e aia=aia.lev0e iris=iris_ground.lev0_dc1\n"
176 "dsout= data set name of lev1 output\n"
177 " default hmi=su_production.hmi_lev1e aia=su_production.aia_lev1e iris=iris_ground.lev1_dc1\n"
178 "brec= first lev0 rec# to process. 0=Stream Mode if erec also 0\n"
179 "erec= last lev0 rec# to process. 0=Stream Mode if brec also 0\n"
180 "bfsn= first fsn# to process. -1=error must be given if fsn mode\n"
181 "efsn= last fsn# to process. -1=error must be given if fsn mode\n"
182 "numcpu= max# of forks to do at a time for stream mode. Default %s\n"
183 "numqsub= max# of qsub to do at a time for reprocessing mode. Default %s\n"
184 "logfile= optional log file name. If not given uses:\n"
185 " /usr/local/logs/lev1/build_lev1_mgr.<time_stamp>.log\n",
186 DEFAULTCPULEV1, DEFAULTQSUBLEV1);
187 printf ("\n * Has two modes:\n"
188 " * Stream Mode (one instance):\n"
189 " * This is the normal quick look mode that runs continuously and\n"
190 " * keeps up with the lev0 records.\n"
191 " * mode=recnum, brec=0, erec=0\n"
192 " * - start at the previous highest lev0 record processed\n"
193 " * This is keep in the DB table lev1_highest_lev0_recnum\n"
194 " * - fork up to 8 (MAXCPULEV1) build_lev1 for every \n"
195 " * 12 (NUMRECLEV1) lev0 records. \n"
196 " * - when an build_lev1 completes, fork another for next 12 rec\n"
197 " * - if no more lev0 records available, sleep and try again\n"
198 " * - if 8 CPU not enough to keep up with lev0, go to 16, etc.\n"
199 " *\n"
200 " * Range Mode (any number of instances):\n"
201 " * brec=1000, erec=2000 or bfsn=44000, efsn=45000\n"
202 " * - qsub up to 16 (MAXQSUBLEV1) build_lev1 for 12 records ea\n"
203 " * - when a job completes qsub next 12 records until erec is reached\n"
204 " * - when all jobs are done, build_lev1_mgr will exit\n\n");
205 return(1);
206 }
207 verbose = cmdparams_get_int (&cmdparams, "v", NULL);
208 return (0);
209 }
210
211 /* Return pointer to "Wed Jun 30 21:49:08 1993\n" */
212 char *get_datetime()
213 {
214 struct timeval tvalr;
215 struct tm *t_ptr;
216 static char datestr[32];
217
218 gettimeofday(&tvalr, NULL);
219 t_ptr = localtime((const time_t *)&tvalr.tv_sec);
220 sprintf(datestr, "%s", asctime(t_ptr));
221 return(datestr);
222 }
223
224 // Setup global datestr[] like: 2008.07.14_08:29:31
225 char *do_datestr() {
226 time_t tval;
227 struct tm *t_ptr;
228
229 tval = time(NULL);
230 t_ptr = localtime(&tval);
231 sprintf(datestr, "%d.%02d.%02d_%02d:%02d:%02d",
232 (t_ptr->tm_year+1900), (t_ptr->tm_mon+1),
233 t_ptr->tm_mday, t_ptr->tm_hour, t_ptr->tm_min, t_ptr->tm_sec);
234 return(datestr);
235 }
236
237 // Returns a time tag like yyyy.mm.dd.hhmmss
238 char *gettimetag()
239 {
240 struct timeval tvalr;
241 struct tm *t_ptr;
242
243 gettimeofday(&tvalr, NULL);
244 t_ptr = localtime((const time_t *)&tvalr);
245 sprintf(timetag, "%04d.%02d.%02d.%02d%02d%02d",
246 (t_ptr->tm_year+1900), (t_ptr->tm_mon+1), t_ptr->tm_mday, t_ptr->tm_hour, t_ptr->tm_min, t_ptr->tm_sec);
247 return(timetag);
248 }
249
250
251 void BeginTimer(int n)
252 {
253 gettimeofday (&first[n], NULL);
254 }
255
256 float EndTimer(int n)
257 {
258 gettimeofday (&second[n], NULL);
259 if (first[n].tv_usec > second[n].tv_usec) {
260 second[n].tv_usec += 1000000;
261 second[n].tv_sec--;
262 }
263 return (float) (second[n].tv_sec-first[n].tv_sec) +
264 (float) (second[n].tv_usec-first[n].tv_usec)/1000000.0;
265 }
266
267 // Outputs the variable format message (re: printf) to the log file.
268 int h1log(const char *fmt, ...)
269 {
270 va_list args;
271 char string[32768];
272
273 va_start(args, fmt);
274 vsprintf(string, fmt, args);
275 if(h1logfp) {
276 fprintf(h1logfp, string);
277 fflush(h1logfp);
278 }
279 else { // couldn't open log
280 printf(string); // also print to stdout
281 fflush(stdout);
282 }
283 va_end(args);
284 return(0);
285 }
286
287 int send_mail(char *fmt, ...)
288 {
289 va_list args;
290 char string[1024], cmd[1024];
291
292 va_start(args, fmt);
293 vsprintf(string, fmt, args);
294 sprintf(cmd, "echo \"%s\" | Mail -s \"%s mail\" lev0_user", string, module_name);
295 system(cmd);
296 va_end(args);
297 return(0);
298 }
299
300 // Got a fatal error.
301 void abortit(int stat)
302 {
303 char pcmd[128];
304
305 printk("***Abort in progress ...\n");
306 printk("**Exit build_lev1_mgr w/ status = %d\n", stat);
307 sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_%s.stream.touch 2>/dev/null",
308 LEV1LOG_BASEDIR, hmiaianame);
309 system(pcmd);
310 if (h1logfp) fclose(h1logfp);
311 exit(stat);
312 }
313
314 /* This is stream mode processing that will keep on processing lev0
315 * records as they show up in the DB.
316 * Process the lev0 to lev1 from recn0 to maxrecn0.
317 * Returns when all children processes are done.
318 * Note: The processing is done in sets of 17 (NUMRECLEV1) lev0 records,
319 * so the maxrecn0 may not be reached, but it will
320 * get done with the next set when more lev0 records come in. forkstream()
321 * is run again and will automatically process new lev0 records in
322 * sets of 17 as they are seen in the DB.
323 * Returns non-0 on error.
324 * If force is set, then do any non-chunk size of lev0 records left.
325 */
326 int forkstream(long long recn0, long long maxrecn0, int force)
327 {
328 pid_t pid, wpid, fpid[MAXCPULEV1];
329 long long numofrecs, frec, lrec;
330 int stat_loc, i, j, k, l, numtofork;
331 char *args[11], pcmd[128];
332 char args1[128], args2[128], args3[128], args4[128], args5[128], args6[128];
333 char args7[128], args8[128], args9[128];
334
335 numofrecs = (maxrecn0 - recn0) + 1;
336 numtofork = numofrecs/numrec; //this many to fork
337 j = numtofork;
338 if(j == 0) {
339 if(force) { j=numtofork=1; }
340 else return(0);
341 }
342 lrec = recn0-1;
343 if(j < numcpu) l = j; //fork less then the max at a time
344 else l = numcpu; //fork the max at a time
345 if(LOGTEST) {
346 sprintf(args8sv, "logfile=%s/build_lev1.%s.log",
347 LEV1LOG_BASEDIR, gettimetag());
348 }
349 for(k=0; k < l; k++) { //fork this many to start
350 if(force) { frec = lrec+1; lrec = (frec + numofrecs)-1; }
351 else { frec = lrec+1; lrec = (frec + numrec)-1; }
352 if((pid = fork()) < 0) {
353 printk("***Can't fork(). errno=%d\n", errno);
354 return(1); //!!TBD decide what to do
355 }
356 else if(pid == 0) { //this is the beloved child
357 switch(instruflg) {
358 case 0:
359 args[0] = "build_lev1_hmi";
360 break;
361 case 1:
362 args[0] = "build_lev1_aia";
363 break;
364 case 2:
365 args[0] = "build_lev1_iris";
366 break;
367 }
368 sprintf(args1, "mode=%s", mode);
369 args[1] = args1;
370 sprintf(args2, "dsin=%s", dsin);
371 args[2] = args2;
372 sprintf(args3, "dsout=%s", dsout);
373 args[3] = args3;
374 if(modeflg) { //recnum mode
375 sprintf(args4, "brec=%lld", frec);
376 args[4] = args4;
377 sprintf(args5, "erec=%lld", lrec);
378 args[5] = args5;
379 }
380 else {
381 sprintf(args4, "bfsn=%lld", frec);
382 args[4] = args4;
383 sprintf(args5, "efsn=%lld", lrec);
384 args[5] = args5;
385 }
386 sprintf(args6, "instru=%s", instru);
387 args[6] = args6;
388 sprintf(args7, "quicklook=%d", quicklook);
389 args[7] = args7;
390 //sprintf(args8, "logfile=%s/l1_%s_%d.log", QSUBDIR, gettimetag(), k);
391 if(LOGTEST) {
392 sprintf(args8, "%s", args8sv);
393 }
394 else {
395 sprintf(args8, "logfile=%s/l1s_%lld_%lld_%s.log",
396 LEV1LOG_BASEDIR, frec, lrec, hmiaianame);
397 }
398 args[8] = args8;
399 /***********The dsaiabad is obsolete***************************************
400 if(hmiaiaflg) { //aia has an extra arg to build_lev1
401 sprintf(args9, "dsaiabad=%s", DSAIABAD);
402 args[9] = args9;
403 args[10] = NULL;
404 printk("execvp: %s %s %s %s %s %s %s %s %s %s\n",
405 args[0],args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8],args[9]);
406 }
407 else {
408 ***************************************************************************/
409 args[9] = NULL;
410 printk("execvp: %s %s %s %s %s %s %s %s %s\n",
411 args[0],args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8]);
412 //}
413 if(execvp(args[0], args) < 0) {
414 switch(instruflg) {
415 case 0:
416 printk("***Can't execvp() build_lev1_hmi. errno=%d\n", errno);
417 break;
418 case 1:
419 printk("***Can't execvp() build_lev1_aia. errno=%d\n", errno);
420 break;
421 case 2:
422 printk("***Can't execvp() build_lev1_iris. errno=%d\n", errno);
423 break;
424 }
425 exit(1);
426 }
427 }
428 --numtofork;
429 printf("forked pid = %d\n", pid);
430 fpid[k] = pid;
431 }
432 wpid = -1; //wait for any child
433 while(1) {
434 //don't block and report any status not yet reported
435 pid = waitpid(wpid, &stat_loc, WNOHANG+WUNTRACED);
436 //pid = waitpid(wpid, &stat_loc, WNOHANG);
437 //printf("!!TEMP waitpid returned %d stat_loc=%d\n", pid, stat_loc);
438 if(pid == 0) { sleep(5); continue; } //nothing ready
439 if(pid == -1) {
440 if(errno == ECHILD) printf("!!No More Children\n");errno=0;
441 //!!TBD assumes we catch up at some point and we know where we're at
442 //now and can update the DB table. Check that this is ok.
443 //now update lev1_highest_lev0_recnum table with lrec
444 sprintf(pcmd, "echo \"update lev1_highest_lev0_recnum set lev0recnum=%lld, date='%s' where lev0series='%s'\" | psql -h hmidb -U production jsoc", lrec, get_datetime(), dsin);
445 printf("%s\n", pcmd); //!!TEMP echo
446 system(pcmd);
447 if(stat(stopfile, &stbuf) == 0) {
448 printf("Stop file %s seen.\nWait until all children are done and exit...\n", stopfile);
449 abortnow = 1; //don't wait for any more lev 0records
450 }
451 if(numtofork <= 0) break;
452 }
453 else {
454 for(k=0; k < numcpu; k++) { //make sure a good one replies and
455 if(fpid[k] == pid) { break; } //don't have to worry about wraparound
456 }
457 if(k == numcpu) continue; //extraneous pid get on first wait
458 printf("returned pid = %d stat_loc = %d\n", pid, stat_loc);
459 if(numtofork == 0) continue; //find out no more children
460 }
461 //fork one more unless the stopfile is seen
462 if(stat(stopfile, &stbuf) == 0) {
463 printf("Stop file %s seen.\nWait until all children are done and exit...\n", stopfile);
464 numtofork = 0; //no new forks
465 abortnow = 1; //don't wait for any more lev 0records
466 continue;
467 }
468 frec = lrec+1; lrec = (frec + numrec)-1;
469 if((pid = fork()) < 0) {
470 printk("***Can't fork(). errno=%d\n", errno);
471 return(1); //!!TBD decide what to do
472 }
473 else if(pid == 0) { //this is the beloved child
474 switch(instruflg) {
475 case 0:
476 args[0] = "build_lev1_hmi";
477 break;
478 case 1:
479 args[0] = "build_lev1_aia";
480 break;
481 case 2:
482 args[0] = "build_lev1_iris";
483 break;
484 }
485 sprintf(args1, "mode=%s", mode);
486 args[1] = args1;
487 sprintf(args2, "dsin=%s", dsin);
488 args[2] = args2;
489 sprintf(args3, "dsout=%s", dsout);
490 args[3] = args3;
491 if(modeflg) { //recnum mode
492 sprintf(args4, "brec=%lld", frec);
493 args[4] = args4;
494 sprintf(args5, "erec=%lld", lrec);
495 args[5] = args5;
496 }
497 else {
498 sprintf(args4, "bfsn=%lld", frec);
499 args[4] = args4;
500 sprintf(args5, "efsn=%lld", lrec);
501 args[5] = args5;
502 }
503 sprintf(args6, "instru=%s", instru);
504 args[6] = args6;
505 sprintf(args7, "quicklook=%d", quicklook);
506 args[7] = args7;
507 if(LOGTEST) {
508 sprintf(args8, "%s", args8sv); //use original log name
509 }
510 else {
511 sprintf(args8, "logfile=%s/l1s_%lld_%lld_%s.log",
512 LEV1LOG_BASEDIR, frec, lrec, hmiaianame);
513 }
514 args[8] = args8;
515 /***********The dsaiabad is obsolete***************************************
516 if(hmiaiaflg) { //aia has an extra arg to build_lev1
517 sprintf(args9, "dsaiabad=%s", DSAIABAD);
518 args[9] = args9;
519 args[10] = NULL;
520 printk("execvp: %s %s %s %s %s %s %s %s %s %s\n",
521 args[0],args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8],args[9]);
522 }
523 else {
524 ***************************************************************************/
525 args[9] = NULL;
526 printk("execvp: %s %s %s %s %s %s %s %s %s\n",
527 args[0],args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8]);
528 //}
529 if(execvp(args[0], args) < 0) {
530 switch(instruflg) {
531 case 0:
532 printk("***Can't execvp() build_lev1_hmi. errno=%d\n", errno);
533 break;
534 case 1:
535 printk("***Can't execvp() build_lev1_aia. errno=%d\n", errno);
536 break;
537 case 2:
538 printk("***Can't execvp() build_lev1_iris. errno=%d\n", errno);
539 break;
540 }
541 exit(1);
542 }
543 }
544 --numtofork;
545 printf("forked pid = %d\n", pid);
546 fpid[k] = pid;
547 }
548 return(0);
549 }
550
551 //Start a qsub job. Set the global jid.
552 //The args are either recnum of fsn according to modeflg.
553 int qsubjob(long long rec1, long long rec2)
554 {
555 FILE *fin;
556 char astr[32], bstr[32], string[128], qlogname[128], qsubcmd[512];;
557 char recrange[128];
558
559 if(modeflg) sprintf(recrange, ":#%lld-#%lld", rec1, rec2);
560 else sprintf(recrange, "%lld-%lld", rec1, rec2);
561 sprintf(open_dsname, "%s[%s]", dsin, recrange);
562 printk("open_dsname = %s\n", open_dsname); //!!TEMP
563 //sprintf(qlogname, "%s/qsub_prod_%d_%d.csh", QSUBDIR, mypid, qcnt++);
564 sprintf(qlogname, "%s/p_%d_%d.csh", QSUBDIR, mypid, qcnt++);
565 if((qsubfp=fopen(qlogname, "w")) == NULL) {
566 fprintf(stderr, "**Can't open the qsub log file %s\n", qlogname);
567 return(1); //!!TBD
568 }
569 fprintf(qsubfp, "#!/bin/csh\n");
570 fprintf(qsubfp, "limit vm 1000M\n");
571 fprintf(qsubfp, "limit coredumpsize 0\n");
572 fprintf(qsubfp, "echo \"TMPDIR = $TMPDIR\"\n");
573
574 if(modeflg) { //recnum mode
575 switch(instruflg) {
576 case 0:
577 fprintf(qsubfp, "build_lev1_hmi mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
578 mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2);
579 break;
580 case 1:
581 fprintf(qsubfp,"build_lev1_aia mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
582 mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2);
583 break;
584 case 2:
585 fprintf(qsubfp,"build_lev1_iris mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
586 mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2);
587 break;
588 }
589 }
590 else { //fsn mode
591 switch(instruflg) {
592 case 0:
593 fprintf(qsubfp, "build_lev1_hmi mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
594 mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2);
595 break;
596 case 1:
597 fprintf(qsubfp,"build_lev1_aia mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
598 mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2);
599 break;
600 case 2:
601 fprintf(qsubfp,"build_lev1_iris mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
602 mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2);
603 break;
604 }
605 }
606 /********************Elim force of stream mode*******************************
607 if(modeflg) { //recnum mode
608 if(hmiaiaflg) { //aia has an extra arg to build_lev1
609 fprintf(qsubfp, "build_lev1_aia --loopconn mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=1 logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
610 mode, dsin, dsout, rec1, rec2, instru, QSUBDIR, rec1, rec2);
611 }
612 else {
613 fprintf(qsubfp, "build_lev1_hmi --loopconn mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=1 logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
614 mode, dsin, dsout, rec1, rec2, instru, QSUBDIR, rec1, rec2);
615 }
616 }
617 else { //fsn mode
618 if(hmiaiaflg) { //aia has an extra arg to build_lev1
619 fprintf(qsubfp, "build_lev1_aia --loopconn mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=1 logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
620 mode, dsin, dsout, rec1, rec2, instru, QSUBDIR, rec1, rec2);
621 }
622 else {
623 fprintf(qsubfp, "build_lev1_hmi --loopconn mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=1 logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
624 mode, dsin, dsout, rec1, rec2, instru, QSUBDIR, rec1, rec2);
625 }
626 }
627 ****************************************************************************/
628 fclose(qsubfp);
629 sprintf(qsubcmd, "qsub -o %s -e %s -q p.q %s",
630 QSUBDIR, QSUBDIR, qlogname); //note: uses p.q
631 //sprintf(qsubcmd, "qsub -o %s -e %s -q j.q %s",
632 // QSUBDIR, QSUBDIR, qlogname);
633 //sprintf(qsubcmd, "qsub -q j.q %s", qlogname);
634 printf("%s\n", qsubcmd);
635 printk("%s\n", qsubcmd);
636 /********!!TEMP noop out*********************************************/
637 sprintf(qsubcmd, "%s | grep \"Your job\"", qsubcmd);
638 fin = popen(qsubcmd, "r");
639 while(fgets(string, sizeof string, fin)) { //get qsub return line
640 sscanf(string, "%s %s %d", astr, bstr, &jid); // get job_id
641 }
642 pclose(fin);
643 printf("\$JOB_ID = %u\n", jid);
644 /********!!TEMP end noop out*********************************************/
645 return(0);
646 }
647
648 int qsubmode(long long frec, long long lrec)
649 {
650 FILE *fin;
651 char qsubcmd[512], string[128];
652 char astr[32], bstr[32], jidstr[MAXJIDSTR];
653 uint64_t qjid[MAXQSUBLEV1], qstatjid[MAXQSUBLEV1];
654 long long numofrecs, rfirst, rlast;
655 int numtoqsub, i, j, l, k, found, status;
656 int jobdone=0;
657
658 numofrecs = (lrec - frec) + 1;
659 numtoqsub = numofrecs/numrec; //this many to qsub
660 if(numofrecs % numrec) numtoqsub++;
661 j = numtoqsub; //0 implies one to qsub
662 rlast = frec-1;
663 if(j < numqsub) l = j; //qsub less then the max at a time
664 else l = numqsub; //qsub the max at a time
665 for(k=0; k < l; k++) { //qsub this many to start
666 rfirst = rlast+1; rlast = (rfirst + numrec)-1;
667 if(rlast > lrec) rlast = lrec; //don't go past end
668 status = qsubjob(rfirst, rlast); //schedule the qsub job. set jid
669 --numtoqsub;
670 qjid[k] = jid;
671 if(k == 0) sprintf(jidstr, "%u", jid);
672 else sprintf(jidstr, "%s,%u", jidstr, jid);
673 }
674 //printf("jidstr = %s\n", jidstr); //!!TEMP
675 printf("numtoqsub left = %d\n", numtoqsub); //!!TEMP
676 //sprintf(qsubcmd, "qstat -j %s 2>/dev/null | grep \"job_number:\"", jidstr);
677 //sprintf(qsubcmd, "qstat -u production | grep \"qsub_prod_%d\"", mypid);
678 //the grep string is from qlogname set in qsubjob()
679 //sprintf(qsubcmd, "qstat -u production | grep \"p_%d_\"", mypid);
680 sprintf(qsubcmd, "qstat -u %s | grep \"p_%d_\"", username, mypid);
681 while(1) {
682 //printf("\ncmd: %s\n", qsubcmd); //!!TEMP
683 if(!(fin = popen(qsubcmd, "r"))) {
684 printf("!!!Fatal Error: can't do %s\n", qsubcmd);
685 return(1); //!!TBD check
686 }
687 //sleep(12);
688 found = 0; k = 0;
689 while(fgets(string, sizeof string, fin)) { //get qstat return line
690 //sscanf(string, "%s %u", astr, &jid); // get job_id
691 sscanf(string, "%u", &jid); // get job_id
692 printf("job id from qstat = %u\n", jid);
693 qstatjid[k++] = jid;
694 found = 1;
695 }
696 pclose(fin);
697
698 //now see if any of the started jobs are done
699 for(i=0; i < l; i++) {
700 for(j=0; j < k; j++) {
701 if(qjid[i] == qstatjid[j]) { //job still active
702 break;
703 }
704 }
705 if(j == k) { //job done. start a new one
706 if(qjid[i] != 0) {
707 printf("Job done jid=%u\n", qjid[i]);
708 jobdone++;
709 qjid[i] = 0;
710 }
711 if(numtoqsub) {
712 //start_new_qusb
713 rfirst = rlast+1; rlast = (rfirst + numrec)-1;
714 if(rlast > lrec) rlast = lrec; //don't go past end
715 status = qsubjob(rfirst, rlast); //schedule the qsub job
716 --numtoqsub;
717 found = 1; //job to be found again
718 qjid[i] = jid;
719 //if(k == 0) sprintf(jidstr, "%u", jid);
720 //else sprintf(jidstr, "%s,%u", jidstr, jid);
721 }
722 //else break; //all done
723 }
724 }
725 for(i=0; i < l; i++) {
726 if(i == 0) sprintf(jidstr, "%u", qjid[i]);
727 else sprintf(jidstr, "%s,%u", jidstr, qjid[i]);
728 }
729 printf("\n");
730 if(!found) break;
731 sleep(3);
732 }
733 printf("All jobs done = %d. numtoqsub = %d\n", jobdone, numtoqsub);
734 return(0);
735 }
736
737 /* Create lev1 from lev0 records in either stream mode or
738 * range mode. Return non-0 on error.
739 * In stream mode force any non-chunk size of lev0 records at the
740 * end to lev1.
741 */
742 int do_ingest(int force)
743 {
744 FILE *fin;
745 int rstatus;
746 long long recnum0, maxrecnum0;
747 char string[128], pcmd[128];
748
749 if(stream_mode) { //start past last lev0 rec# processed
750 sprintf(pcmd, "echo \"select lev0recnum from lev1_highest_lev0_recnum where lev0series='%s'\" | psql -h hmidb -U production jsoc", dsin);
751 fin = popen(pcmd, "r");
752 while(fgets(string, sizeof string, fin)) { //get psql return line
753 if(strstr(string, "lev0recnum")) continue;
754 if(strstr(string, "-----")) continue;
755 //get lev0 rec#
756 if((rstatus = sscanf(string, "%lld", &recnum0)) == 0) {
757 printf("Abort no lev0 entry in lev1_highest_lev0_recnum\n");
758 printk("Abort no lev0 entry in lev1_highest_lev0_recnum\n");
759 abortit(1); //no rec#
760 }
761 recnum0++; //start at next rec#
762 break;
763 }
764 pclose(fin);
765 sprintf(pcmd, "echo \"select max(recnum) from %s\" | psql -h hmidb -U production jsoc", dsin);
766 fin = popen(pcmd, "r");
767 while(fgets(string, sizeof string, fin)) { //get psql return line
768 if(strstr(string, "max")) continue;
769 if(strstr(string, "-----")) continue;
770 if(!strcmp(string, " \n")) { //new series w/no recnum
771 printf("Abort no max lev0 recnum (new series?)\n");
772 printk("Abort no max lev0 recnum (new series?)\n");
773 abortit(1);
774 }
775 sscanf(string, "%lld", &maxrecnum0);
776 //maxrecnum0 = maxrecnum0 - 25; //allow time for commit of lev0
777 //maxrecnum0 = maxrecnum0 - 40; //allow (more) time for commit of lev0
778 if(instruflg == 2) maxrecnum0 = maxrecnum0 - 200; //allow (more) time for iris
779 else maxrecnum0 = maxrecnum0 - 80; //allow (more) time for commit of lev0
780 lastrecnum0_prev = lastrecnum0_now;
781 lastrecnum0_now = maxrecnum0; //save to see if more come in
782 break;
783 }
784 pclose(fin);
785 printf("Stream Mode starting at lev0 recnum = %lld maxrecnum = %lld\n",
786 recnum0, maxrecnum0);
787 if(recnum0 > maxrecnum0) return(0); //nothing to do. go wait
788 rstatus = forkstream(recnum0, maxrecnum0, force);
789 }
790 else {
791 if(modeflg) { //recnum mode
792 //range mode. use brec/erec and qsub build_lev1 programs
793 rstatus = qsubmode(brec, erec);
794 }
795 else { //fsn mode
796 rstatus = qsubmode(bfsn, efsn);
797 }
798 }
799 return(rstatus);
800 }
801
802 void sighandler(sig)
803 int sig;
804 {
805 char pcmd[128];
806 if(sig == SIGTERM) {
807 printf("*** %s build_lev1_mgr got SIGTERM. Exiting.\n", datestr);
808 printk("*** %s build_lev1_mgr got SIGTERM. Exiting.\n", datestr);
809 sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_%s.stream.touch 2>/dev/null",
810 LEV1LOG_BASEDIR, hmiaianame);
811 system(pcmd);
812 exit(1);
813 }
814 if(sig == SIGINT) {
815 printf("*** %s build_lev1_mgr got SIGINT. Exiting.\n", datestr);
816 printk("*** %s build_lev1_mgr got SIGINT. Exiting.\n", datestr);
817 sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_%s.stream.touch 2>/dev/null",
818 LEV1LOG_BASEDIR, hmiaianame);
819 system(pcmd);
820 exit(1);
821 }
822 printk("*** %s build_lev1_mgr got an illegal signal %d, ignoring...\n",
823 datestr, sig);
824 if (signal(SIGINT, SIG_IGN) != SIG_IGN)
825 signal(SIGINT, sighandler);
826 if (signal(SIGALRM, SIG_IGN) != SIG_IGN)
827 signal(SIGALRM, sighandler);
828 }
829
830 // Initial setup stuff called when main is first entered.
831 void setup()
832 {
833 FILE *fin;
834 char string[128], cwdbuf[128], idstr[256], lfile[128];
835 int tpid;
836
837 do_datestr();
838 printk_set(h1log, h1log); // set for printk calls
839 printk("%s\n", datestr);
840 getcwd(cwdbuf, 126);
841 mypid = getpid();
842 sprintf(idstr, "Cwd: %s\nCall: ", cwdbuf);
843 sprintf(string, "%s started as pid=%d user=%s\n", module_name, mypid, username);
844 strcat(idstr, string);
845 printk("%s", idstr);
846 printf("%s", idstr);
847 switch(instruflg) {
848 case 0:
849 sprintf(stopfile, "/usr/local/logs/lev1/build_mgr_stop_hmi");
850 break;
851 case 1:
852 sprintf(stopfile, "/usr/local/logs/lev1/build_mgr_stop_aia");
853 break;
854 case 2:
855 sprintf(stopfile, "/usr/local/logs/lev1/build_mgr_stop_iris");
856 break;
857 }
858 if(stream_mode) {
859 sprintf(string, "/bin/rm -f %s", stopfile); //remove any stop file
860 system(string);
861 }
862 sprintf(argmode, "mode=%s", mode);
863 sprintf(arginstru, "instru=%s", instru);
864 sprintf(argdsin, "dsin=%s", dsin);
865 sprintf(argdsout, "dsout=%s", dsout);
866 if(modeflg) {
867 sprintf(argbx, "brec=%lld", brec);
868 sprintf(argex, "erec=%lld", erec);
869 }
870 else {
871 sprintf(argbx, "bfsn=%lld", bfsn);
872 sprintf(argex, "efsn=%lld", efsn);
873 }
874 sprintf(argnumrec, "numrec=%d", numrec);
875 sprintf(argnumcpu, "numcpu=%d", numcpu);
876 sprintf(argnumqsub, "numqsub=%d", numqsub);
877 sprintf(arglogfile, "logfile=%s", logname);
878 printk("%s %s %s %s %s %s %s %s %s\n", argmode, arginstru, argdsin, argdsout, argbx, argex, argnumrec, argnumcpu, argnumqsub);
879 printf("%s %s %s %s %s %s %s %s %s\n", argmode, arginstru, argdsin, argdsout, argbx, argex, argnumrec, argnumcpu, argnumqsub);
880 if (signal(SIGINT, SIG_IGN) != SIG_IGN)
881 signal(SIGINT, sighandler);
882 if (signal(SIGTERM, SIG_IGN) != SIG_IGN)
883 signal(SIGTERM, sighandler);
884 if (signal(SIGALRM, SIG_IGN) != SIG_IGN)
885 signal(SIGALRM, sighandler);
886 sprintf(idstr, "ps -ef | grep %s", LEV1VIEWERNAME);
887 fin = popen(idstr, "r");
888 while(fgets(string, sizeof string, fin)) { //get ps line
889 if(!(strstr(string, "perl"))) continue;
890 sscanf(string, "%s %d", idstr, &tpid); // get user name & process id
891 sprintf(lfile, "%s/build_lev1_mgr_restart_%d.touch", LEV1LOG_BASEDIR, tpid);
892 sprintf(idstr, "/bin/touch %s", lfile);
893 printk("%s\n", idstr);
894 system(idstr);
895 }
896 umask(002); // allow group write
897 }
898
899 // Module main function.
900 int main(int argc, char **argv)
901 {
902 FILE *fin;
903 int wflg = 1;
904 char line[128], pcmd[128];
905
906 if (cmdparams_parse(&cmdparams, argc, argv) == -1)
907 {
908 fprintf(stderr,"Error: Command line parsing failed. Aborting.\n");
909 return 1;
910 }
911 if (nice_intro())
912 return (0);
913 if(!(username = (char *)getenv("USER"))) username = "nouser";
914 instru = cmdparams_get_str(&cmdparams, "instru", NULL);
915 if(strcmp(instru, "hmi") && strcmp(instru, "aia") && strcmp(instru, "iris")) {
916 printf("instru= %s\n", instru); //!!TEMP
917 printf("Error: instru= must be given as 'hmi' or 'aia' or 'iris'\n");
918 return(0);
919 }
920 if(!strcmp(instru, "aia")) {
921 instruflg = 1;
922 sprintf(hmiaianame, "aia");
923 }
924 else if(!strcmp(instru, "hmi")) {
925 instruflg = 0;
926 sprintf(hmiaianame, "hmi");
927 }
928 else {
929 instruflg = 2;
930 sprintf(hmiaianame, "iris");
931 }
932 mode = cmdparams_get_str(&cmdparams, "mode", NULL);
933 if(strcmp(mode, "recnum") && strcmp(mode, "fsn")) {
934 printf("Error: mode= must be given as 'recnum' or 'fsn'\n");
935 return(0);
936 }
937 if(!strcmp(mode, "recnum")) modeflg = 1;
938 dsin = cmdparams_get_str(&cmdparams, "dsin", NULL);
939 dsout = cmdparams_get_str(&cmdparams, "dsout", NULL);
940 brec = cmdparams_get_int(&cmdparams, "brec", NULL);
941 erec = cmdparams_get_int(&cmdparams, "erec", NULL);
942 bfsn = cmdparams_get_int(&cmdparams, "bfsn", NULL);
943 efsn = cmdparams_get_int(&cmdparams, "efsn", NULL);
944 quicklook = cmdparams_get_int(&cmdparams, "quicklook", NULL);
945 numrec = cmdparams_get_int(&cmdparams, "numrec", NULL);
946 numcpu = cmdparams_get_int(&cmdparams, "numcpu", NULL);
947 numqsub = cmdparams_get_int(&cmdparams, "numqsub", NULL);
948 if(numcpu > MAXCPULEV1) {
949 printf("numcpu exceeds max of %d\n", MAXCPULEV1);
950 return(0);
951 }
952 if(numqsub > MAXQSUBLEV1) {
953 printf("numqsub exceeds max of %d\n", MAXQSUBLEV1);
954 return(0);
955 }
956 if(modeflg) { //recnum mode
957 if(brec == -1 || erec == -1) {
958 fprintf(stderr, "brec and erec must be given. -1 not allowed\n");
959 fprintf(stderr, "use brec=0 erec=0 to process in stream mode\n");
960 return(0);
961 }
962 if(brec > erec) {
963 fprintf(stderr, "brec must be <= erec\n");
964 return(0);
965 }
966 if(brec == 0 && erec == 0) {
967 //make sure there isn't a stream mode already running
968 sprintf(pcmd, "ls %s/build_lev1_mgr_%s.stream.touch 2>/dev/null",
969 LEV1LOG_BASEDIR, hmiaianame);
970 fin = popen(pcmd, "r");
971 while(fgets(line, sizeof line, fin)) { //get ps return line
972 printf("Error: build_lev1_mgr already running.");
973 printf(" If not so, do:\n");
974 printf("/bin/rm %s/build_lev1_mgr_%s.stream.touch\n",
975 LEV1LOG_BASEDIR, hmiaianame);
976 pclose(fin);
977 return(0);
978 }
979 pclose(fin);
980 sprintf(pcmd, "touch %s/build_lev1_mgr_%s.stream.touch 2>/dev/null",
981 LEV1LOG_BASEDIR, hmiaianame);
982 system(pcmd);
983 stream_mode = 1; //aka quick look mode
984 quicklook = 1; //and force quicklook to 1
985 }
986 }
987 else { //fsn mode
988 if(bfsn == -1 || efsn == -1) {
989 fprintf(stderr, "bfsn and efsn must be given. -1 not allowed\n");
990 return(0);
991 }
992 if(bfsn == 0 || efsn == 0) {
993 fprintf(stderr, "bfsn and efsn must be given. 0 not allowed\n");
994 return(0);
995 }
996 if(bfsn > efsn) {
997 fprintf(stderr, "bfsn must be <= efsn\n");
998 return(0);
999 }
1000 }
1001 logfile = cmdparams_get_str(&cmdparams, "logfile", NULL);
1002 if (strcmp(dsin, NOTSPECIFIED) == 0) {
1003 switch(instruflg) {
1004 case 0:
1005 dsin = LEV0SERIESNAMEHMI;
1006 break;
1007 case 1:
1008 dsin = LEV0SERIESNAMEAIA;
1009 break;
1010 case 2:
1011 dsin = LEV0SERIESNAMEIRIS;
1012 break;
1013 }
1014 }
1015 if (strcmp(dsout, NOTSPECIFIED) == 0) {
1016 switch(instruflg) {
1017 case 0:
1018 dsout = LEV1SERIESNAMEHMI;
1019 break;
1020 case 1:
1021 dsout = LEV1SERIESNAMEAIA;
1022 break;
1023 case 2:
1024 dsout = LEV1SERIESNAMEIRIS;
1025 break;
1026 }
1027 }
1028 if (strcmp(logfile, NOTSPECIFIED) == 0) {
1029 sprintf(logname, H1LOGFILE, hmiaianame, gettimetag());
1030 }
1031 else {
1032 sprintf(logname, "%s", logfile);
1033 }
1034 if((h1logfp=fopen(logname, "w")) == NULL)
1035 fprintf(stderr, "**Can't open the log file %s\n", logname);
1036 setup();
1037 while(wflg) {
1038 if(do_ingest(0)) { // loop to get files from the lev0
1039 printk("**ERROR: in do_ingest() for %s\n", open_dsname);
1040 }
1041 if(!stream_mode) return(0); //all done for reprocessing
1042 if(abortnow) break;
1043 sleep(30); //wait for more lev0 to appear
1044 if(lastrecnum0_now == lastrecnum0_prev) { //no new lev0 in
1045 if(loopcnt++ == 15) { //force any left over lev0 records to lev1
1046 printk("Timeout: force any left over lev0 to lev1\n");
1047 if(do_ingest(1)) { // process the last lev0 records
1048 printk("**ERROR: in do_ingest() for %s\n", open_dsname);
1049 }
1050 loopcnt = 0;
1051 }
1052 }
1053 else loopcnt = 0;
1054 //wflg = 0;
1055 }
1056 sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_%s.stream.touch 2>/dev/null",
1057 LEV1LOG_BASEDIR, hmiaianame);
1058 system(pcmd);
1059 return(0);
1060 }
1061