1 |
/*----------------------------------------------------------------------------- |
2 |
* cvs/JSOC/proj/lev0/apps/build_lev1_mgr.c |
3 |
*----------------------------------------------------------------------------- |
4 |
* |
5 |
* This is a stand alone module that processes lev0 |
6 |
* filtergrams to lev1 by running the build_lev1 module. |
7 |
* (See "Normal Calls:" below for normal running calls) |
8 |
* |
9 |
*build_lev1_mgr |
10 |
* mode= recnum or fsn |
11 |
* instru= hmi or aia |
12 |
* dsin= default hmi.lev0e or aia.lev0e |
13 |
* dsout= hmi.lev1e or aia.lev1e |
14 |
* brec= first lev0 record# to process |
15 |
* erec= last lev0 record# to process |
16 |
* or |
17 |
* bfsn= first lev0 fsn# to process |
18 |
* efsn= last lev0 fsn# to process |
19 |
* |
20 |
* Runs build_lev1 processes to create lev1 datasets |
21 |
* Has a mode to specify lev0 input by record# or fsn. Also if in |
22 |
* mode=recnum and brec=0 and erec=0 then runs in Stream Mode: |
23 |
* Stream Mode (one instance): |
24 |
* This is the normal quick look mode that runs continuously and |
25 |
* keeps up with the lev0 records. |
26 |
* brec=0, erec=0 |
27 |
* - start at the previous highest lev0 record processed |
28 |
* This is keep in the DB table lev1_highest_lev0_recnum |
29 |
* - fork from 8 to MAXCPULEV1 build_lev1 for every |
30 |
* 12 (NUMRECLEV1) lev0 records. |
31 |
* - when an build_lev1 completes, fork another for next 12 rec |
32 |
* - if no more lev0 records available, sleep and try again |
33 |
* - if 8 CPU not enough to keep up with lev0, go to 16, etc. |
34 |
* |
35 |
* Recnum/fsn range Mode (any number of instances): |
36 |
* This is lev1 processing by a lev0 range. Does consistency check for mode= and |
37 |
* the use of brec/erec of bfsn/efsn, e.g.: |
38 |
* brec=1000, erec=2000 or bfsn=4836500, efsn=4836590 |
39 |
* - qsub up to 16 (MAXQSUBLEV1) build_lev1 for 12 records ea |
40 |
* - when a job completes qsub next 12 records until erec or efsn is reached |
41 |
* - when all jobs are done, build_lev1_mgr will exit |
42 |
* |
43 |
*/ |
44 |
/* Normal Calls: |
45 |
* For hmi call on cl1n002 as user jsocprod: |
46 |
* build_lev1_mgr mode=recnum instru=hmi dsin=hmi.lev0a dsout=hmi.lev1_nrt |
47 |
* brec=0 erec=0 |
48 |
* For aia call on cl1n003 as user jsocprod: |
49 |
* build_lev1_mgr mode=recnum instru=aia dsin=aia.lev0 dsout=aia.lev1_nrt2 |
50 |
* brec=0 erec=0 |
51 |
* For iris call on cl1n001 as user jsocprod: |
52 |
* build_lev1_mgr mode=recnum instru=iris dsin=iris_ground.lev0_dc1 |
53 |
* dsout=iris_ground.lev1_dc1 brec=0 erec=0 |
54 |
*/ |
55 |
|
56 |
#include <jsoc.h> |
57 |
#include <cmdparams.h> |
58 |
#include <drms.h> |
59 |
#include <stdio.h> |
60 |
#include <stdlib.h> |
61 |
#include <strings.h> |
62 |
#include <signal.h> |
63 |
#include <sys/types.h> |
64 |
#include <sys/time.h> |
65 |
#include <sys/stat.h> //for umask(2) |
66 |
#include <unistd.h> //for alarm(2) among other things... |
67 |
#include <printk.h> |
68 |
#include <errno.h> |
69 |
#include <sys/wait.h> |
70 |
#include "lev0lev1.h" //defines NUMRECLEV1. Used by this and build_lev1.c |
71 |
|
72 |
//default in and out data series |
73 |
#define LEV0SERIESNAMEHMI "hmi.lev0e" |
74 |
#define LEV0SERIESNAMEAIA "aia.lev0e" |
75 |
#define LEV0SERIESNAMEIRIS "iris_ground.lev0_dc1" |
76 |
#define LEV1SERIESNAMEHMI "su_production.hmi_lev1e" //temp test case |
77 |
#define LEV1SERIESNAMEAIA "su_production.aia_lev1e" //temp test case |
78 |
#define LEV1SERIESNAMEIRIS "iris_ground.lev1_dc1" |
79 |
#define DSAIABAD "lm_jps.aia_bad_blobs" //dsaiabad= arg value to build_lev1 for aia. Eliminated by jim on 03Mar2011 |
80 |
|
81 |
#define LEV1LOG_BASEDIR "/usr/local/logs/lev1" |
82 |
#define H1LOGFILE "/usr/local/logs/lev1/build_lev1_mgr_%s.%s.log" |
83 |
#define QSUBDIR "/surge40/jsocprod/qsub/tmp" |
84 |
#define NUMTIMERS 8 //number of seperate timers avail |
85 |
#define MAXCPULEV1 32 //max# of forks can do at a time for stream mode |
86 |
#define DEFAULTCPULEV1 "8" //default# of forks can do at a time |
87 |
#define MAXQSUBLEV1 64 //max# of qsub can do at a time for reprocessing mode |
88 |
#define DEFAULTQSUBLEV1 "16" |
89 |
#define MAXJIDSTR MAXQSUBLEV1*16 |
90 |
#define NOTSPECIFIED "***NOTSPECIFIED***" |
91 |
#define LOGTEST 0 |
92 |
char args8sv[128]; //used when LOGTEST = 1 |
93 |
|
94 |
|
95 |
int qsubjob(long long rec1, long long rec2); |
96 |
|
97 |
// List of default parameter values. |
98 |
ModuleArgs_t module_args[] = { |
99 |
{ARG_STRING, "instru", "hmi" , "instrument. either hmi,aia or iris"}, |
100 |
{ARG_STRING, "dsin", NOTSPECIFIED, "dataset of lev0 filtergrams"}, |
101 |
{ARG_STRING, "dsout", NOTSPECIFIED, "dataset of lev1 output"}, |
102 |
{ARG_STRING, "logfile", NOTSPECIFIED, "optional log file name. Will create one if not given"}, |
103 |
{ARG_INTS, "brec", "-1", "first lev0 rec# to process"}, |
104 |
{ARG_INTS, "erec", "-1", "last lev0 rec# to process"}, |
105 |
{ARG_INTS, "bfsn", "-1", "first lev0 fsn to process"}, |
106 |
{ARG_INTS, "efsn", "-1", "last lev0 fsn to process"}, |
107 |
{ARG_INTS, "quicklook", "0", "1=quick look, 0 = definitive mode"}, |
108 |
{ARG_INTS, "numrec", NUMRECLEV1S, "number of lev0 to lev1 records at a time"}, |
109 |
{ARG_INTS, "numcpu", DEFAULTCPULEV1, "max# of forks to do at a time for stream mode"}, |
110 |
{ARG_INTS, "numqsub", DEFAULTQSUBLEV1, "max# of qsub to do at a time for reprocessing mode"}, |
111 |
{ARG_FLAG, "v", "0", "verbose flag"}, |
112 |
{ARG_FLAG, "h", "0", "help flag"}, |
113 |
{ARG_END} |
114 |
}; |
115 |
|
116 |
ModuleArgs_t *gModArgs = module_args; |
117 |
CmdParams_t cmdparams; |
118 |
// Module name presented to DRMS. |
119 |
char *module_name = "build_lev1_mgr"; |
120 |
|
121 |
FILE *h1logfp; // fp for h1 ouput log for this run |
122 |
FILE *qsubfp; // fp for qsub script |
123 |
static char datestr[32]; |
124 |
static char open_dsname[256]; |
125 |
static struct timeval first[NUMTIMERS], second[NUMTIMERS]; |
126 |
static struct stat stbuf; |
127 |
|
128 |
pid_t mypid; |
129 |
uint64_t jid; |
130 |
int verbose; |
131 |
long long brec, erec, bfsn, efsn; //begin and end lev0 rec#/fsn to do.Must be same data type |
132 |
long long lastrecnum0_now = 0; //last RT lev0 recnum seen |
133 |
long long lastrecnum0_prev = 0; //prvious RT lev0 recnum seen |
134 |
int numrec, numcpu, numqsub; |
135 |
int qcnt = 0; |
136 |
//stream_mode is also quick look mode, i.e. brec=erec=0 |
137 |
int stream_mode = 0; //0=qsub build_lev1, 1=fork it locally |
138 |
int quicklook = 0; //can force this flg to be passed to build_lev1 |
139 |
//int hmiaiaflg = 0; //0=hmi, 1=aia |
140 |
int instruflg = 0; //0=hmi, 1=aia, 2=iris |
141 |
int modeflg = 0; //0=fsn, 1=recnum |
142 |
int loopcnt = 0; //force last lev0 records to lev1 |
143 |
int abortnow = 0; |
144 |
char stopfile[80]; // e.g. /usr/local/logs/lev1/build_mgr_stop_hmi |
145 |
char hmiaianame[16]; // "hmi" or "aia" or "iris" |
146 |
char logname[128]; |
147 |
char argdsin[128], argdsout[128], arglogfile[128], arginstru[80], argmode[80]; |
148 |
char argbx[80], argex[80], argnumrec[80], argnumcpu[80], argnumqsub[80]; |
149 |
char timetag[32]; |
150 |
char *username; // from getenv("USER") |
151 |
char *logfile; // optional log name passed in |
152 |
char *instru; // instument. hmi or aia |
153 |
char *mode; // mode. recnum or fsn |
154 |
char *dsin; // lev0 input dataset |
155 |
char *dsout; // lev1 output dataset |
156 |
|
157 |
|
158 |
int nice_intro () |
159 |
{ |
160 |
int usage = cmdparams_get_int (&cmdparams, "h", NULL); |
161 |
if (usage) |
162 |
{ |
163 |
printf ("Runs build_lev1 processes to create lev1 datasets.\n\n"); |
164 |
printf ("Usage: build_lev1_mgr [-vh]\n" |
165 |
"mode=<recnum|fsn> instru=<hmi|aia|iris> dsin=<lev0> dsout=<lev1>\n" |
166 |
"brec=<rec#>|bfsn=<fsn#> erec=<rec#>|efsn=<fsn#>\n" |
167 |
"numcpu=<#> numqsub=<#> logfile=<file>\n" |
168 |
" -h: help - show this message then exit\n" |
169 |
" -v: verbose\n" |
170 |
"mode= recnum: brec and erec have the record # range to process \n" |
171 |
" fsn: bfsn and efsn have the fsn # range to process\n" |
172 |
" For safety, the mode and arg name used must be consistent\n" |
173 |
"instru= instrument. must be 'hmi' or 'aia' or 'iris'\n" |
174 |
"dsin= data set name of lev0 input\n" |
175 |
" default hmi=hmi.lev0e aia=aia.lev0e iris=iris_ground.lev0_dc1\n" |
176 |
"dsout= data set name of lev1 output\n" |
177 |
" default hmi=su_production.hmi_lev1e aia=su_production.aia_lev1e iris=iris_ground.lev1_dc1\n" |
178 |
"brec= first lev0 rec# to process. 0=Stream Mode if erec also 0\n" |
179 |
"erec= last lev0 rec# to process. 0=Stream Mode if brec also 0\n" |
180 |
"bfsn= first fsn# to process. -1=error must be given if fsn mode\n" |
181 |
"efsn= last fsn# to process. -1=error must be given if fsn mode\n" |
182 |
"numcpu= max# of forks to do at a time for stream mode. Default %s\n" |
183 |
"numqsub= max# of qsub to do at a time for reprocessing mode. Default %s\n" |
184 |
"logfile= optional log file name. If not given uses:\n" |
185 |
" /usr/local/logs/lev1/build_lev1_mgr.<time_stamp>.log\n", |
186 |
DEFAULTCPULEV1, DEFAULTQSUBLEV1); |
187 |
printf ("\n * Has two modes:\n" |
188 |
" * Stream Mode (one instance):\n" |
189 |
" * This is the normal quick look mode that runs continuously and\n" |
190 |
" * keeps up with the lev0 records.\n" |
191 |
" * mode=recnum, brec=0, erec=0\n" |
192 |
" * - start at the previous highest lev0 record processed\n" |
193 |
" * This is keep in the DB table lev1_highest_lev0_recnum\n" |
194 |
" * - fork up to 8 (MAXCPULEV1) build_lev1 for every \n" |
195 |
" * 12 (NUMRECLEV1) lev0 records. \n" |
196 |
" * - when an build_lev1 completes, fork another for next 12 rec\n" |
197 |
" * - if no more lev0 records available, sleep and try again\n" |
198 |
" * - if 8 CPU not enough to keep up with lev0, go to 16, etc.\n" |
199 |
" *\n" |
200 |
" * Range Mode (any number of instances):\n" |
201 |
" * brec=1000, erec=2000 or bfsn=44000, efsn=45000\n" |
202 |
" * - qsub up to 16 (MAXQSUBLEV1) build_lev1 for 12 records ea\n" |
203 |
" * - when a job completes qsub next 12 records until erec is reached\n" |
204 |
" * - when all jobs are done, build_lev1_mgr will exit\n\n"); |
205 |
return(1); |
206 |
} |
207 |
verbose = cmdparams_get_int (&cmdparams, "v", NULL); |
208 |
return (0); |
209 |
} |
210 |
|
211 |
/* Return pointer to "Wed Jun 30 21:49:08 1993\n" */ |
212 |
char *get_datetime() |
213 |
{ |
214 |
struct timeval tvalr; |
215 |
struct tm *t_ptr; |
216 |
static char datestr[32]; |
217 |
|
218 |
gettimeofday(&tvalr, NULL); |
219 |
t_ptr = localtime((const time_t *)&tvalr.tv_sec); |
220 |
sprintf(datestr, "%s", asctime(t_ptr)); |
221 |
return(datestr); |
222 |
} |
223 |
|
224 |
// Setup global datestr[] like: 2008.07.14_08:29:31 |
225 |
char *do_datestr() { |
226 |
time_t tval; |
227 |
struct tm *t_ptr; |
228 |
|
229 |
tval = time(NULL); |
230 |
t_ptr = localtime(&tval); |
231 |
sprintf(datestr, "%d.%02d.%02d_%02d:%02d:%02d", |
232 |
(t_ptr->tm_year+1900), (t_ptr->tm_mon+1), |
233 |
t_ptr->tm_mday, t_ptr->tm_hour, t_ptr->tm_min, t_ptr->tm_sec); |
234 |
return(datestr); |
235 |
} |
236 |
|
237 |
// Returns a time tag like yyyy.mm.dd.hhmmss |
238 |
char *gettimetag() |
239 |
{ |
240 |
struct timeval tvalr; |
241 |
struct tm *t_ptr; |
242 |
|
243 |
gettimeofday(&tvalr, NULL); |
244 |
t_ptr = localtime((const time_t *)&tvalr); |
245 |
sprintf(timetag, "%04d.%02d.%02d.%02d%02d%02d", |
246 |
(t_ptr->tm_year+1900), (t_ptr->tm_mon+1), t_ptr->tm_mday, t_ptr->tm_hour, t_ptr->tm_min, t_ptr->tm_sec); |
247 |
return(timetag); |
248 |
} |
249 |
|
250 |
|
251 |
void BeginTimer(int n) |
252 |
{ |
253 |
gettimeofday (&first[n], NULL); |
254 |
} |
255 |
|
256 |
float EndTimer(int n) |
257 |
{ |
258 |
gettimeofday (&second[n], NULL); |
259 |
if (first[n].tv_usec > second[n].tv_usec) { |
260 |
second[n].tv_usec += 1000000; |
261 |
second[n].tv_sec--; |
262 |
} |
263 |
return (float) (second[n].tv_sec-first[n].tv_sec) + |
264 |
(float) (second[n].tv_usec-first[n].tv_usec)/1000000.0; |
265 |
} |
266 |
|
267 |
// Outputs the variable format message (re: printf) to the log file. |
268 |
int h1log(const char *fmt, ...) |
269 |
{ |
270 |
va_list args; |
271 |
char string[32768]; |
272 |
|
273 |
va_start(args, fmt); |
274 |
vsprintf(string, fmt, args); |
275 |
if(h1logfp) { |
276 |
fprintf(h1logfp, string); |
277 |
fflush(h1logfp); |
278 |
} |
279 |
else { // couldn't open log |
280 |
printf(string); // also print to stdout |
281 |
fflush(stdout); |
282 |
} |
283 |
va_end(args); |
284 |
return(0); |
285 |
} |
286 |
|
287 |
int send_mail(char *fmt, ...) |
288 |
{ |
289 |
va_list args; |
290 |
char string[1024], cmd[1024]; |
291 |
|
292 |
va_start(args, fmt); |
293 |
vsprintf(string, fmt, args); |
294 |
sprintf(cmd, "echo \"%s\" | Mail -s \"%s mail\" lev0_user", string, module_name); |
295 |
system(cmd); |
296 |
va_end(args); |
297 |
return(0); |
298 |
} |
299 |
|
300 |
// Got a fatal error. |
301 |
void abortit(int stat) |
302 |
{ |
303 |
char pcmd[128]; |
304 |
|
305 |
printk("***Abort in progress ...\n"); |
306 |
printk("**Exit build_lev1_mgr w/ status = %d\n", stat); |
307 |
sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_%s.stream.touch 2>/dev/null", |
308 |
LEV1LOG_BASEDIR, hmiaianame); |
309 |
system(pcmd); |
310 |
if (h1logfp) fclose(h1logfp); |
311 |
exit(stat); |
312 |
} |
313 |
|
314 |
/* This is stream mode processing that will keep on processing lev0 |
315 |
* records as they show up in the DB. |
316 |
* Process the lev0 to lev1 from recn0 to maxrecn0. |
317 |
* Returns when all children processes are done. |
318 |
* Note: The processing is done in sets of 17 (NUMRECLEV1) lev0 records, |
319 |
* so the maxrecn0 may not be reached, but it will |
320 |
* get done with the next set when more lev0 records come in. forkstream() |
321 |
* is run again and will automatically process new lev0 records in |
322 |
* sets of 17 as they are seen in the DB. |
323 |
* Returns non-0 on error. |
324 |
* If force is set, then do any non-chunk size of lev0 records left. |
325 |
*/ |
326 |
int forkstream(long long recn0, long long maxrecn0, int force) |
327 |
{ |
328 |
pid_t pid, wpid, fpid[MAXCPULEV1]; |
329 |
long long numofrecs, frec, lrec; |
330 |
int stat_loc, i, j, k, l, numtofork; |
331 |
char *args[11], pcmd[128]; |
332 |
char args1[128], args2[128], args3[128], args4[128], args5[128], args6[128]; |
333 |
char args7[128], args8[128], args9[128]; |
334 |
|
335 |
numofrecs = (maxrecn0 - recn0) + 1; |
336 |
numtofork = numofrecs/numrec; //this many to fork |
337 |
j = numtofork; |
338 |
if(j == 0) { |
339 |
if(force) { j=numtofork=1; } |
340 |
else return(0); |
341 |
} |
342 |
lrec = recn0-1; |
343 |
if(j < numcpu) l = j; //fork less then the max at a time |
344 |
else l = numcpu; //fork the max at a time |
345 |
if(LOGTEST) { |
346 |
sprintf(args8sv, "logfile=%s/build_lev1.%s.log", |
347 |
LEV1LOG_BASEDIR, gettimetag()); |
348 |
} |
349 |
for(k=0; k < l; k++) { //fork this many to start |
350 |
if(force) { frec = lrec+1; lrec = (frec + numofrecs)-1; } |
351 |
else { frec = lrec+1; lrec = (frec + numrec)-1; } |
352 |
if((pid = fork()) < 0) { |
353 |
printk("***Can't fork(). errno=%d\n", errno); |
354 |
return(1); //!!TBD decide what to do |
355 |
} |
356 |
else if(pid == 0) { //this is the beloved child |
357 |
switch(instruflg) { |
358 |
case 0: |
359 |
args[0] = "build_lev1_hmi"; |
360 |
break; |
361 |
case 1: |
362 |
args[0] = "build_lev1_aia"; |
363 |
break; |
364 |
case 2: |
365 |
args[0] = "build_lev1_iris"; |
366 |
break; |
367 |
} |
368 |
sprintf(args1, "mode=%s", mode); |
369 |
args[1] = args1; |
370 |
sprintf(args2, "dsin=%s", dsin); |
371 |
args[2] = args2; |
372 |
sprintf(args3, "dsout=%s", dsout); |
373 |
args[3] = args3; |
374 |
if(modeflg) { //recnum mode |
375 |
sprintf(args4, "brec=%lld", frec); |
376 |
args[4] = args4; |
377 |
sprintf(args5, "erec=%lld", lrec); |
378 |
args[5] = args5; |
379 |
} |
380 |
else { |
381 |
sprintf(args4, "bfsn=%lld", frec); |
382 |
args[4] = args4; |
383 |
sprintf(args5, "efsn=%lld", lrec); |
384 |
args[5] = args5; |
385 |
} |
386 |
sprintf(args6, "instru=%s", instru); |
387 |
args[6] = args6; |
388 |
sprintf(args7, "quicklook=%d", quicklook); |
389 |
args[7] = args7; |
390 |
//sprintf(args8, "logfile=%s/l1_%s_%d.log", QSUBDIR, gettimetag(), k); |
391 |
if(LOGTEST) { |
392 |
sprintf(args8, "%s", args8sv); |
393 |
} |
394 |
else { |
395 |
sprintf(args8, "logfile=%s/l1s_%lld_%lld_%s.log", |
396 |
LEV1LOG_BASEDIR, frec, lrec, hmiaianame); |
397 |
} |
398 |
args[8] = args8; |
399 |
/***********The dsaiabad is obsolete*************************************** |
400 |
if(hmiaiaflg) { //aia has an extra arg to build_lev1 |
401 |
sprintf(args9, "dsaiabad=%s", DSAIABAD); |
402 |
args[9] = args9; |
403 |
args[10] = NULL; |
404 |
printk("execvp: %s %s %s %s %s %s %s %s %s %s\n", |
405 |
args[0],args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8],args[9]); |
406 |
} |
407 |
else { |
408 |
***************************************************************************/ |
409 |
args[9] = NULL; |
410 |
printk("execvp: %s %s %s %s %s %s %s %s %s\n", |
411 |
args[0],args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8]); |
412 |
//} |
413 |
if(execvp(args[0], args) < 0) { |
414 |
switch(instruflg) { |
415 |
case 0: |
416 |
printk("***Can't execvp() build_lev1_hmi. errno=%d\n", errno); |
417 |
break; |
418 |
case 1: |
419 |
printk("***Can't execvp() build_lev1_aia. errno=%d\n", errno); |
420 |
break; |
421 |
case 2: |
422 |
printk("***Can't execvp() build_lev1_iris. errno=%d\n", errno); |
423 |
break; |
424 |
} |
425 |
exit(1); |
426 |
} |
427 |
} |
428 |
--numtofork; |
429 |
printf("forked pid = %d\n", pid); |
430 |
fpid[k] = pid; |
431 |
} |
432 |
wpid = -1; //wait for any child |
433 |
while(1) { |
434 |
//don't block and report any status not yet reported |
435 |
pid = waitpid(wpid, &stat_loc, WNOHANG+WUNTRACED); |
436 |
//pid = waitpid(wpid, &stat_loc, WNOHANG); |
437 |
//printf("!!TEMP waitpid returned %d stat_loc=%d\n", pid, stat_loc); |
438 |
if(pid == 0) { sleep(5); continue; } //nothing ready |
439 |
if(pid == -1) { |
440 |
if(errno == ECHILD) printf("!!No More Children\n");errno=0; |
441 |
//!!TBD assumes we catch up at some point and we know where we're at |
442 |
//now and can update the DB table. Check that this is ok. |
443 |
//now update lev1_highest_lev0_recnum table with lrec |
444 |
sprintf(pcmd, "echo \"update lev1_highest_lev0_recnum set lev0recnum=%lld, date='%s' where lev0series='%s'\" | psql -h hmidb -U production jsoc", lrec, get_datetime(), dsin); |
445 |
printf("%s\n", pcmd); //!!TEMP echo |
446 |
system(pcmd); |
447 |
if(stat(stopfile, &stbuf) == 0) { |
448 |
printf("Stop file %s seen.\nWait until all children are done and exit...\n", stopfile); |
449 |
abortnow = 1; //don't wait for any more lev 0records |
450 |
} |
451 |
if(numtofork <= 0) break; |
452 |
} |
453 |
else { |
454 |
for(k=0; k < numcpu; k++) { //make sure a good one replies and |
455 |
if(fpid[k] == pid) { break; } //don't have to worry about wraparound |
456 |
} |
457 |
if(k == numcpu) continue; //extraneous pid get on first wait |
458 |
printf("returned pid = %d stat_loc = %d\n", pid, stat_loc); |
459 |
if(numtofork == 0) continue; //find out no more children |
460 |
} |
461 |
//fork one more unless the stopfile is seen |
462 |
if(stat(stopfile, &stbuf) == 0) { |
463 |
printf("Stop file %s seen.\nWait until all children are done and exit...\n", stopfile); |
464 |
numtofork = 0; //no new forks |
465 |
abortnow = 1; //don't wait for any more lev 0records |
466 |
continue; |
467 |
} |
468 |
frec = lrec+1; lrec = (frec + numrec)-1; |
469 |
if((pid = fork()) < 0) { |
470 |
printk("***Can't fork(). errno=%d\n", errno); |
471 |
return(1); //!!TBD decide what to do |
472 |
} |
473 |
else if(pid == 0) { //this is the beloved child |
474 |
switch(instruflg) { |
475 |
case 0: |
476 |
args[0] = "build_lev1_hmi"; |
477 |
break; |
478 |
case 1: |
479 |
args[0] = "build_lev1_aia"; |
480 |
break; |
481 |
case 2: |
482 |
args[0] = "build_lev1_iris"; |
483 |
break; |
484 |
} |
485 |
sprintf(args1, "mode=%s", mode); |
486 |
args[1] = args1; |
487 |
sprintf(args2, "dsin=%s", dsin); |
488 |
args[2] = args2; |
489 |
sprintf(args3, "dsout=%s", dsout); |
490 |
args[3] = args3; |
491 |
if(modeflg) { //recnum mode |
492 |
sprintf(args4, "brec=%lld", frec); |
493 |
args[4] = args4; |
494 |
sprintf(args5, "erec=%lld", lrec); |
495 |
args[5] = args5; |
496 |
} |
497 |
else { |
498 |
sprintf(args4, "bfsn=%lld", frec); |
499 |
args[4] = args4; |
500 |
sprintf(args5, "efsn=%lld", lrec); |
501 |
args[5] = args5; |
502 |
} |
503 |
sprintf(args6, "instru=%s", instru); |
504 |
args[6] = args6; |
505 |
sprintf(args7, "quicklook=%d", quicklook); |
506 |
args[7] = args7; |
507 |
if(LOGTEST) { |
508 |
sprintf(args8, "%s", args8sv); //use original log name |
509 |
} |
510 |
else { |
511 |
sprintf(args8, "logfile=%s/l1s_%lld_%lld_%s.log", |
512 |
LEV1LOG_BASEDIR, frec, lrec, hmiaianame); |
513 |
} |
514 |
args[8] = args8; |
515 |
/***********The dsaiabad is obsolete*************************************** |
516 |
if(hmiaiaflg) { //aia has an extra arg to build_lev1 |
517 |
sprintf(args9, "dsaiabad=%s", DSAIABAD); |
518 |
args[9] = args9; |
519 |
args[10] = NULL; |
520 |
printk("execvp: %s %s %s %s %s %s %s %s %s %s\n", |
521 |
args[0],args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8],args[9]); |
522 |
} |
523 |
else { |
524 |
***************************************************************************/ |
525 |
args[9] = NULL; |
526 |
printk("execvp: %s %s %s %s %s %s %s %s %s\n", |
527 |
args[0],args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8]); |
528 |
//} |
529 |
if(execvp(args[0], args) < 0) { |
530 |
switch(instruflg) { |
531 |
case 0: |
532 |
printk("***Can't execvp() build_lev1_hmi. errno=%d\n", errno); |
533 |
break; |
534 |
case 1: |
535 |
printk("***Can't execvp() build_lev1_aia. errno=%d\n", errno); |
536 |
break; |
537 |
case 2: |
538 |
printk("***Can't execvp() build_lev1_iris. errno=%d\n", errno); |
539 |
break; |
540 |
} |
541 |
exit(1); |
542 |
} |
543 |
} |
544 |
--numtofork; |
545 |
printf("forked pid = %d\n", pid); |
546 |
fpid[k] = pid; |
547 |
} |
548 |
return(0); |
549 |
} |
550 |
|
551 |
//Start a qsub job. Set the global jid. |
552 |
//The args are either recnum of fsn according to modeflg. |
553 |
int qsubjob(long long rec1, long long rec2) |
554 |
{ |
555 |
FILE *fin; |
556 |
char astr[32], bstr[32], string[128], qlogname[128], qsubcmd[512];; |
557 |
char recrange[128]; |
558 |
|
559 |
if(modeflg) sprintf(recrange, ":#%lld-#%lld", rec1, rec2); |
560 |
else sprintf(recrange, "%lld-%lld", rec1, rec2); |
561 |
sprintf(open_dsname, "%s[%s]", dsin, recrange); |
562 |
printk("open_dsname = %s\n", open_dsname); //!!TEMP |
563 |
//sprintf(qlogname, "%s/qsub_prod_%d_%d.csh", QSUBDIR, mypid, qcnt++); |
564 |
sprintf(qlogname, "%s/p_%d_%d.csh", QSUBDIR, mypid, qcnt++); |
565 |
if((qsubfp=fopen(qlogname, "w")) == NULL) { |
566 |
fprintf(stderr, "**Can't open the qsub log file %s\n", qlogname); |
567 |
return(1); //!!TBD |
568 |
} |
569 |
fprintf(qsubfp, "#!/bin/csh\n"); |
570 |
fprintf(qsubfp, "limit vm 1000M\n"); |
571 |
fprintf(qsubfp, "limit coredumpsize 0\n"); |
572 |
fprintf(qsubfp, "echo \"TMPDIR = $TMPDIR\"\n"); |
573 |
|
574 |
if(modeflg) { //recnum mode |
575 |
switch(instruflg) { |
576 |
case 0: |
577 |
fprintf(qsubfp, "build_lev1_hmi mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n", |
578 |
mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2); |
579 |
break; |
580 |
case 1: |
581 |
fprintf(qsubfp,"build_lev1_aia mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n", |
582 |
mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2); |
583 |
break; |
584 |
case 2: |
585 |
fprintf(qsubfp,"build_lev1_iris mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n", |
586 |
mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2); |
587 |
break; |
588 |
} |
589 |
} |
590 |
else { //fsn mode |
591 |
switch(instruflg) { |
592 |
case 0: |
593 |
fprintf(qsubfp, "build_lev1_hmi mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n", |
594 |
mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2); |
595 |
break; |
596 |
case 1: |
597 |
fprintf(qsubfp,"build_lev1_aia mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n", |
598 |
mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2); |
599 |
break; |
600 |
case 2: |
601 |
fprintf(qsubfp,"build_lev1_iris mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n", |
602 |
mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2); |
603 |
break; |
604 |
} |
605 |
} |
606 |
/********************Elim force of stream mode******************************* |
607 |
if(modeflg) { //recnum mode |
608 |
if(hmiaiaflg) { //aia has an extra arg to build_lev1 |
609 |
fprintf(qsubfp, "build_lev1_aia --loopconn mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=1 logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n", |
610 |
mode, dsin, dsout, rec1, rec2, instru, QSUBDIR, rec1, rec2); |
611 |
} |
612 |
else { |
613 |
fprintf(qsubfp, "build_lev1_hmi --loopconn mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=1 logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n", |
614 |
mode, dsin, dsout, rec1, rec2, instru, QSUBDIR, rec1, rec2); |
615 |
} |
616 |
} |
617 |
else { //fsn mode |
618 |
if(hmiaiaflg) { //aia has an extra arg to build_lev1 |
619 |
fprintf(qsubfp, "build_lev1_aia --loopconn mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=1 logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n", |
620 |
mode, dsin, dsout, rec1, rec2, instru, QSUBDIR, rec1, rec2); |
621 |
} |
622 |
else { |
623 |
fprintf(qsubfp, "build_lev1_hmi --loopconn mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=1 logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n", |
624 |
mode, dsin, dsout, rec1, rec2, instru, QSUBDIR, rec1, rec2); |
625 |
} |
626 |
} |
627 |
****************************************************************************/ |
628 |
fclose(qsubfp); |
629 |
sprintf(qsubcmd, "qsub -o %s -e %s -q p.q %s", |
630 |
QSUBDIR, QSUBDIR, qlogname); //note: uses p.q |
631 |
//sprintf(qsubcmd, "qsub -o %s -e %s -q j.q %s", |
632 |
// QSUBDIR, QSUBDIR, qlogname); |
633 |
//sprintf(qsubcmd, "qsub -q j.q %s", qlogname); |
634 |
printf("%s\n", qsubcmd); |
635 |
printk("%s\n", qsubcmd); |
636 |
/********!!TEMP noop out*********************************************/ |
637 |
sprintf(qsubcmd, "%s | grep \"Your job\"", qsubcmd); |
638 |
fin = popen(qsubcmd, "r"); |
639 |
while(fgets(string, sizeof string, fin)) { //get qsub return line |
640 |
sscanf(string, "%s %s %d", astr, bstr, &jid); // get job_id |
641 |
} |
642 |
pclose(fin); |
643 |
printf("\$JOB_ID = %u\n", jid); |
644 |
/********!!TEMP end noop out*********************************************/ |
645 |
return(0); |
646 |
} |
647 |
|
648 |
int qsubmode(long long frec, long long lrec) |
649 |
{ |
650 |
FILE *fin; |
651 |
char qsubcmd[512], string[128]; |
652 |
char astr[32], bstr[32], jidstr[MAXJIDSTR]; |
653 |
uint64_t qjid[MAXQSUBLEV1], qstatjid[MAXQSUBLEV1]; |
654 |
long long numofrecs, rfirst, rlast; |
655 |
int numtoqsub, i, j, l, k, found, status; |
656 |
int jobdone=0; |
657 |
|
658 |
numofrecs = (lrec - frec) + 1; |
659 |
numtoqsub = numofrecs/numrec; //this many to qsub |
660 |
if(numofrecs % numrec) numtoqsub++; |
661 |
j = numtoqsub; //0 implies one to qsub |
662 |
rlast = frec-1; |
663 |
if(j < numqsub) l = j; //qsub less then the max at a time |
664 |
else l = numqsub; //qsub the max at a time |
665 |
for(k=0; k < l; k++) { //qsub this many to start |
666 |
rfirst = rlast+1; rlast = (rfirst + numrec)-1; |
667 |
if(rlast > lrec) rlast = lrec; //don't go past end |
668 |
status = qsubjob(rfirst, rlast); //schedule the qsub job. set jid |
669 |
--numtoqsub; |
670 |
qjid[k] = jid; |
671 |
if(k == 0) sprintf(jidstr, "%u", jid); |
672 |
else sprintf(jidstr, "%s,%u", jidstr, jid); |
673 |
} |
674 |
//printf("jidstr = %s\n", jidstr); //!!TEMP |
675 |
printf("numtoqsub left = %d\n", numtoqsub); //!!TEMP |
676 |
//sprintf(qsubcmd, "qstat -j %s 2>/dev/null | grep \"job_number:\"", jidstr); |
677 |
//sprintf(qsubcmd, "qstat -u production | grep \"qsub_prod_%d\"", mypid); |
678 |
//the grep string is from qlogname set in qsubjob() |
679 |
//sprintf(qsubcmd, "qstat -u production | grep \"p_%d_\"", mypid); |
680 |
sprintf(qsubcmd, "qstat -u %s | grep \"p_%d_\"", username, mypid); |
681 |
while(1) { |
682 |
//printf("\ncmd: %s\n", qsubcmd); //!!TEMP |
683 |
if(!(fin = popen(qsubcmd, "r"))) { |
684 |
printf("!!!Fatal Error: can't do %s\n", qsubcmd); |
685 |
return(1); //!!TBD check |
686 |
} |
687 |
//sleep(12); |
688 |
found = 0; k = 0; |
689 |
while(fgets(string, sizeof string, fin)) { //get qstat return line |
690 |
//sscanf(string, "%s %u", astr, &jid); // get job_id |
691 |
sscanf(string, "%u", &jid); // get job_id |
692 |
printf("job id from qstat = %u\n", jid); |
693 |
qstatjid[k++] = jid; |
694 |
found = 1; |
695 |
} |
696 |
pclose(fin); |
697 |
|
698 |
//now see if any of the started jobs are done |
699 |
for(i=0; i < l; i++) { |
700 |
for(j=0; j < k; j++) { |
701 |
if(qjid[i] == qstatjid[j]) { //job still active |
702 |
break; |
703 |
} |
704 |
} |
705 |
if(j == k) { //job done. start a new one |
706 |
if(qjid[i] != 0) { |
707 |
printf("Job done jid=%u\n", qjid[i]); |
708 |
jobdone++; |
709 |
qjid[i] = 0; |
710 |
} |
711 |
if(numtoqsub) { |
712 |
//start_new_qusb |
713 |
rfirst = rlast+1; rlast = (rfirst + numrec)-1; |
714 |
if(rlast > lrec) rlast = lrec; //don't go past end |
715 |
status = qsubjob(rfirst, rlast); //schedule the qsub job |
716 |
--numtoqsub; |
717 |
found = 1; //job to be found again |
718 |
qjid[i] = jid; |
719 |
//if(k == 0) sprintf(jidstr, "%u", jid); |
720 |
//else sprintf(jidstr, "%s,%u", jidstr, jid); |
721 |
} |
722 |
//else break; //all done |
723 |
} |
724 |
} |
725 |
for(i=0; i < l; i++) { |
726 |
if(i == 0) sprintf(jidstr, "%u", qjid[i]); |
727 |
else sprintf(jidstr, "%s,%u", jidstr, qjid[i]); |
728 |
} |
729 |
printf("\n"); |
730 |
if(!found) break; |
731 |
sleep(3); |
732 |
} |
733 |
printf("All jobs done = %d. numtoqsub = %d\n", jobdone, numtoqsub); |
734 |
return(0); |
735 |
} |
736 |
|
737 |
/* Create lev1 from lev0 records in either stream mode or |
738 |
* range mode. Return non-0 on error. |
739 |
* In stream mode force any non-chunk size of lev0 records at the |
740 |
* end to lev1. |
741 |
*/ |
742 |
int do_ingest(int force) |
743 |
{ |
744 |
FILE *fin; |
745 |
int rstatus; |
746 |
long long recnum0, maxrecnum0; |
747 |
char string[128], pcmd[128]; |
748 |
|
749 |
if(stream_mode) { //start past last lev0 rec# processed |
750 |
sprintf(pcmd, "echo \"select lev0recnum from lev1_highest_lev0_recnum where lev0series='%s'\" | psql -h hmidb -U production jsoc", dsin); |
751 |
fin = popen(pcmd, "r"); |
752 |
while(fgets(string, sizeof string, fin)) { //get psql return line |
753 |
if(strstr(string, "lev0recnum")) continue; |
754 |
if(strstr(string, "-----")) continue; |
755 |
//get lev0 rec# |
756 |
if((rstatus = sscanf(string, "%lld", &recnum0)) == 0) { |
757 |
printf("Abort no lev0 entry in lev1_highest_lev0_recnum\n"); |
758 |
printk("Abort no lev0 entry in lev1_highest_lev0_recnum\n"); |
759 |
abortit(1); //no rec# |
760 |
} |
761 |
recnum0++; //start at next rec# |
762 |
break; |
763 |
} |
764 |
pclose(fin); |
765 |
sprintf(pcmd, "echo \"select max(recnum) from %s\" | psql -h hmidb -U production jsoc", dsin); |
766 |
fin = popen(pcmd, "r"); |
767 |
while(fgets(string, sizeof string, fin)) { //get psql return line |
768 |
if(strstr(string, "max")) continue; |
769 |
if(strstr(string, "-----")) continue; |
770 |
if(!strcmp(string, " \n")) { //new series w/no recnum |
771 |
printf("Abort no max lev0 recnum (new series?)\n"); |
772 |
printk("Abort no max lev0 recnum (new series?)\n"); |
773 |
abortit(1); |
774 |
} |
775 |
sscanf(string, "%lld", &maxrecnum0); |
776 |
//maxrecnum0 = maxrecnum0 - 25; //allow time for commit of lev0 |
777 |
//maxrecnum0 = maxrecnum0 - 40; //allow (more) time for commit of lev0 |
778 |
if(instruflg == 2) maxrecnum0 = maxrecnum0 - 200; //allow (more) time for iris |
779 |
else maxrecnum0 = maxrecnum0 - 80; //allow (more) time for commit of lev0 |
780 |
lastrecnum0_prev = lastrecnum0_now; |
781 |
lastrecnum0_now = maxrecnum0; //save to see if more come in |
782 |
break; |
783 |
} |
784 |
pclose(fin); |
785 |
printf("Stream Mode starting at lev0 recnum = %lld maxrecnum = %lld\n", |
786 |
recnum0, maxrecnum0); |
787 |
if(recnum0 > maxrecnum0) return(0); //nothing to do. go wait |
788 |
rstatus = forkstream(recnum0, maxrecnum0, force); |
789 |
} |
790 |
else { |
791 |
if(modeflg) { //recnum mode |
792 |
//range mode. use brec/erec and qsub build_lev1 programs |
793 |
rstatus = qsubmode(brec, erec); |
794 |
} |
795 |
else { //fsn mode |
796 |
rstatus = qsubmode(bfsn, efsn); |
797 |
} |
798 |
} |
799 |
return(rstatus); |
800 |
} |
801 |
|
802 |
void sighandler(sig) |
803 |
int sig; |
804 |
{ |
805 |
char pcmd[128]; |
806 |
if(sig == SIGTERM) { |
807 |
printf("*** %s build_lev1_mgr got SIGTERM. Exiting.\n", datestr); |
808 |
printk("*** %s build_lev1_mgr got SIGTERM. Exiting.\n", datestr); |
809 |
sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_%s.stream.touch 2>/dev/null", |
810 |
LEV1LOG_BASEDIR, hmiaianame); |
811 |
system(pcmd); |
812 |
exit(1); |
813 |
} |
814 |
if(sig == SIGINT) { |
815 |
printf("*** %s build_lev1_mgr got SIGINT. Exiting.\n", datestr); |
816 |
printk("*** %s build_lev1_mgr got SIGINT. Exiting.\n", datestr); |
817 |
sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_%s.stream.touch 2>/dev/null", |
818 |
LEV1LOG_BASEDIR, hmiaianame); |
819 |
system(pcmd); |
820 |
exit(1); |
821 |
} |
822 |
printk("*** %s build_lev1_mgr got an illegal signal %d, ignoring...\n", |
823 |
datestr, sig); |
824 |
if (signal(SIGINT, SIG_IGN) != SIG_IGN) |
825 |
signal(SIGINT, sighandler); |
826 |
if (signal(SIGALRM, SIG_IGN) != SIG_IGN) |
827 |
signal(SIGALRM, sighandler); |
828 |
} |
829 |
|
830 |
// Initial setup stuff called when main is first entered. |
831 |
void setup() |
832 |
{ |
833 |
FILE *fin; |
834 |
char string[128], cwdbuf[128], idstr[256], lfile[128]; |
835 |
int tpid; |
836 |
|
837 |
do_datestr(); |
838 |
printk_set(h1log, h1log); // set for printk calls |
839 |
printk("%s\n", datestr); |
840 |
getcwd(cwdbuf, 126); |
841 |
mypid = getpid(); |
842 |
sprintf(idstr, "Cwd: %s\nCall: ", cwdbuf); |
843 |
sprintf(string, "%s started as pid=%d user=%s\n", module_name, mypid, username); |
844 |
strcat(idstr, string); |
845 |
printk("%s", idstr); |
846 |
printf("%s", idstr); |
847 |
switch(instruflg) { |
848 |
case 0: |
849 |
sprintf(stopfile, "/usr/local/logs/lev1/build_mgr_stop_hmi"); |
850 |
break; |
851 |
case 1: |
852 |
sprintf(stopfile, "/usr/local/logs/lev1/build_mgr_stop_aia"); |
853 |
break; |
854 |
case 2: |
855 |
sprintf(stopfile, "/usr/local/logs/lev1/build_mgr_stop_iris"); |
856 |
break; |
857 |
} |
858 |
if(stream_mode) { |
859 |
sprintf(string, "/bin/rm -f %s", stopfile); //remove any stop file |
860 |
system(string); |
861 |
} |
862 |
sprintf(argmode, "mode=%s", mode); |
863 |
sprintf(arginstru, "instru=%s", instru); |
864 |
sprintf(argdsin, "dsin=%s", dsin); |
865 |
sprintf(argdsout, "dsout=%s", dsout); |
866 |
if(modeflg) { |
867 |
sprintf(argbx, "brec=%lld", brec); |
868 |
sprintf(argex, "erec=%lld", erec); |
869 |
} |
870 |
else { |
871 |
sprintf(argbx, "bfsn=%lld", bfsn); |
872 |
sprintf(argex, "efsn=%lld", efsn); |
873 |
} |
874 |
sprintf(argnumrec, "numrec=%d", numrec); |
875 |
sprintf(argnumcpu, "numcpu=%d", numcpu); |
876 |
sprintf(argnumqsub, "numqsub=%d", numqsub); |
877 |
sprintf(arglogfile, "logfile=%s", logname); |
878 |
printk("%s %s %s %s %s %s %s %s %s\n", argmode, arginstru, argdsin, argdsout, argbx, argex, argnumrec, argnumcpu, argnumqsub); |
879 |
printf("%s %s %s %s %s %s %s %s %s\n", argmode, arginstru, argdsin, argdsout, argbx, argex, argnumrec, argnumcpu, argnumqsub); |
880 |
if (signal(SIGINT, SIG_IGN) != SIG_IGN) |
881 |
signal(SIGINT, sighandler); |
882 |
if (signal(SIGTERM, SIG_IGN) != SIG_IGN) |
883 |
signal(SIGTERM, sighandler); |
884 |
if (signal(SIGALRM, SIG_IGN) != SIG_IGN) |
885 |
signal(SIGALRM, sighandler); |
886 |
sprintf(idstr, "ps -ef | grep %s", LEV1VIEWERNAME); |
887 |
fin = popen(idstr, "r"); |
888 |
while(fgets(string, sizeof string, fin)) { //get ps line |
889 |
if(!(strstr(string, "perl"))) continue; |
890 |
sscanf(string, "%s %d", idstr, &tpid); // get user name & process id |
891 |
sprintf(lfile, "%s/build_lev1_mgr_restart_%d.touch", LEV1LOG_BASEDIR, tpid); |
892 |
sprintf(idstr, "/bin/touch %s", lfile); |
893 |
printk("%s\n", idstr); |
894 |
system(idstr); |
895 |
} |
896 |
umask(002); // allow group write |
897 |
} |
898 |
|
899 |
// Module main function. |
900 |
int main(int argc, char **argv) |
901 |
{ |
902 |
FILE *fin; |
903 |
int wflg = 1; |
904 |
char line[128], pcmd[128]; |
905 |
|
906 |
if (cmdparams_parse(&cmdparams, argc, argv) == -1) |
907 |
{ |
908 |
fprintf(stderr,"Error: Command line parsing failed. Aborting.\n"); |
909 |
return 1; |
910 |
} |
911 |
if (nice_intro()) |
912 |
return (0); |
913 |
if(!(username = (char *)getenv("USER"))) username = "nouser"; |
914 |
instru = cmdparams_get_str(&cmdparams, "instru", NULL); |
915 |
if(strcmp(instru, "hmi") && strcmp(instru, "aia") && strcmp(instru, "iris")) { |
916 |
printf("instru= %s\n", instru); //!!TEMP |
917 |
printf("Error: instru= must be given as 'hmi' or 'aia' or 'iris'\n"); |
918 |
return(0); |
919 |
} |
920 |
if(!strcmp(instru, "aia")) { |
921 |
instruflg = 1; |
922 |
sprintf(hmiaianame, "aia"); |
923 |
} |
924 |
else if(!strcmp(instru, "hmi")) { |
925 |
instruflg = 0; |
926 |
sprintf(hmiaianame, "hmi"); |
927 |
} |
928 |
else { |
929 |
instruflg = 2; |
930 |
sprintf(hmiaianame, "iris"); |
931 |
} |
932 |
mode = cmdparams_get_str(&cmdparams, "mode", NULL); |
933 |
if(strcmp(mode, "recnum") && strcmp(mode, "fsn")) { |
934 |
printf("Error: mode= must be given as 'recnum' or 'fsn'\n"); |
935 |
return(0); |
936 |
} |
937 |
if(!strcmp(mode, "recnum")) modeflg = 1; |
938 |
dsin = cmdparams_get_str(&cmdparams, "dsin", NULL); |
939 |
dsout = cmdparams_get_str(&cmdparams, "dsout", NULL); |
940 |
brec = cmdparams_get_int(&cmdparams, "brec", NULL); |
941 |
erec = cmdparams_get_int(&cmdparams, "erec", NULL); |
942 |
bfsn = cmdparams_get_int(&cmdparams, "bfsn", NULL); |
943 |
efsn = cmdparams_get_int(&cmdparams, "efsn", NULL); |
944 |
quicklook = cmdparams_get_int(&cmdparams, "quicklook", NULL); |
945 |
numrec = cmdparams_get_int(&cmdparams, "numrec", NULL); |
946 |
numcpu = cmdparams_get_int(&cmdparams, "numcpu", NULL); |
947 |
numqsub = cmdparams_get_int(&cmdparams, "numqsub", NULL); |
948 |
if(numcpu > MAXCPULEV1) { |
949 |
printf("numcpu exceeds max of %d\n", MAXCPULEV1); |
950 |
return(0); |
951 |
} |
952 |
if(numqsub > MAXQSUBLEV1) { |
953 |
printf("numqsub exceeds max of %d\n", MAXQSUBLEV1); |
954 |
return(0); |
955 |
} |
956 |
if(modeflg) { //recnum mode |
957 |
if(brec == -1 || erec == -1) { |
958 |
fprintf(stderr, "brec and erec must be given. -1 not allowed\n"); |
959 |
fprintf(stderr, "use brec=0 erec=0 to process in stream mode\n"); |
960 |
return(0); |
961 |
} |
962 |
if(brec > erec) { |
963 |
fprintf(stderr, "brec must be <= erec\n"); |
964 |
return(0); |
965 |
} |
966 |
if(brec == 0 && erec == 0) { |
967 |
//make sure there isn't a stream mode already running |
968 |
sprintf(pcmd, "ls %s/build_lev1_mgr_%s.stream.touch 2>/dev/null", |
969 |
LEV1LOG_BASEDIR, hmiaianame); |
970 |
fin = popen(pcmd, "r"); |
971 |
while(fgets(line, sizeof line, fin)) { //get ps return line |
972 |
printf("Error: build_lev1_mgr already running."); |
973 |
printf(" If not so, do:\n"); |
974 |
printf("/bin/rm %s/build_lev1_mgr_%s.stream.touch\n", |
975 |
LEV1LOG_BASEDIR, hmiaianame); |
976 |
pclose(fin); |
977 |
return(0); |
978 |
} |
979 |
pclose(fin); |
980 |
sprintf(pcmd, "touch %s/build_lev1_mgr_%s.stream.touch 2>/dev/null", |
981 |
LEV1LOG_BASEDIR, hmiaianame); |
982 |
system(pcmd); |
983 |
stream_mode = 1; //aka quick look mode |
984 |
quicklook = 1; //and force quicklook to 1 |
985 |
} |
986 |
} |
987 |
else { //fsn mode |
988 |
if(bfsn == -1 || efsn == -1) { |
989 |
fprintf(stderr, "bfsn and efsn must be given. -1 not allowed\n"); |
990 |
return(0); |
991 |
} |
992 |
if(bfsn == 0 || efsn == 0) { |
993 |
fprintf(stderr, "bfsn and efsn must be given. 0 not allowed\n"); |
994 |
return(0); |
995 |
} |
996 |
if(bfsn > efsn) { |
997 |
fprintf(stderr, "bfsn must be <= efsn\n"); |
998 |
return(0); |
999 |
} |
1000 |
} |
1001 |
logfile = cmdparams_get_str(&cmdparams, "logfile", NULL); |
1002 |
if (strcmp(dsin, NOTSPECIFIED) == 0) { |
1003 |
switch(instruflg) { |
1004 |
case 0: |
1005 |
dsin = LEV0SERIESNAMEHMI; |
1006 |
break; |
1007 |
case 1: |
1008 |
dsin = LEV0SERIESNAMEAIA; |
1009 |
break; |
1010 |
case 2: |
1011 |
dsin = LEV0SERIESNAMEIRIS; |
1012 |
break; |
1013 |
} |
1014 |
} |
1015 |
if (strcmp(dsout, NOTSPECIFIED) == 0) { |
1016 |
switch(instruflg) { |
1017 |
case 0: |
1018 |
dsout = LEV1SERIESNAMEHMI; |
1019 |
break; |
1020 |
case 1: |
1021 |
dsout = LEV1SERIESNAMEAIA; |
1022 |
break; |
1023 |
case 2: |
1024 |
dsout = LEV1SERIESNAMEIRIS; |
1025 |
break; |
1026 |
} |
1027 |
} |
1028 |
if (strcmp(logfile, NOTSPECIFIED) == 0) { |
1029 |
sprintf(logname, H1LOGFILE, hmiaianame, gettimetag()); |
1030 |
} |
1031 |
else { |
1032 |
sprintf(logname, "%s", logfile); |
1033 |
} |
1034 |
if((h1logfp=fopen(logname, "w")) == NULL) |
1035 |
fprintf(stderr, "**Can't open the log file %s\n", logname); |
1036 |
setup(); |
1037 |
while(wflg) { |
1038 |
if(do_ingest(0)) { // loop to get files from the lev0 |
1039 |
printk("**ERROR: in do_ingest() for %s\n", open_dsname); |
1040 |
} |
1041 |
if(!stream_mode) return(0); //all done for reprocessing |
1042 |
if(abortnow) break; |
1043 |
sleep(30); //wait for more lev0 to appear |
1044 |
if(lastrecnum0_now == lastrecnum0_prev) { //no new lev0 in |
1045 |
if(loopcnt++ == 15) { //force any left over lev0 records to lev1 |
1046 |
printk("Timeout: force any left over lev0 to lev1\n"); |
1047 |
if(do_ingest(1)) { // process the last lev0 records |
1048 |
printk("**ERROR: in do_ingest() for %s\n", open_dsname); |
1049 |
} |
1050 |
loopcnt = 0; |
1051 |
} |
1052 |
} |
1053 |
else loopcnt = 0; |
1054 |
//wflg = 0; |
1055 |
} |
1056 |
sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_%s.stream.touch 2>/dev/null", |
1057 |
LEV1LOG_BASEDIR, hmiaianame); |
1058 |
system(pcmd); |
1059 |
return(0); |
1060 |
} |
1061 |
|