ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/JSOC/proj/lev0/apps/extract_fds_statev.c
Revision: 1.24
Committed: Fri Dec 21 17:55:27 2012 UTC (10 years, 9 months ago) by arta
Content type: text/plain
Branch: MAIN
CVS Tags: Ver_9-1, Ver_LATEST, Ver_9-3, Ver_9-41, Ver_9-2, Ver_8-8, Ver_8-2, Ver_8-3, Ver_8-0, Ver_8-1, Ver_8-6, Ver_8-7, Ver_8-4, Ver_8-5, Ver_9-5, Ver_9-4, Ver_8-10, Ver_8-11, Ver_8-12, Ver_9-0, HEAD
Changes since 1.23: +19 -9 lines
Log Message:
Add a call to drms_count_records(). This module uses drms_open_recordset(), which has no knowledge of the number of records.

File Contents

# Content
1 /* extractFdsStateV.c */
2
3 /* Extracts heliocentric state data, for the input date and from the input data series.
4 * Inputs:
5 * Data series in - the fully qualified name of the data series that contains the FDS data
6 * Time range - the range of dates to which the extraction should be restricted
7 * Data series out - the fully qualified name of the data series in which to store
8 * the extracted data
9 */
10
11 #include <stdio.h>
12 #include <stdlib.h>
13
14 #include "jsoc_main.h"
15 #include "cmdparams.h"
16 #include "drms_env.h"
17
18 #define DEBUG 0
19
20 #define kDefaultNamespace "sdo"
21 #define kDefaultSeriesIn "fds"
22 #define kDefaultSeriesOut "fds_orbit_vectors"
23 #define kSeriesHistory "fds_orbit_ingesthist"
24 #define kObsDateKey "OBS_DATE"
25 #define kFdsDataProductKey "FDS_DATA_PRODUCT"
26 #define kFdsProductCompKey "FDS_PRODUCT_COMP"
27 #define kFdsDataFormatKey "DATA_FORMAT"
28 #define kFileVersionKey "FILE_VERSION"
29 #define kFdsProductCompHELIO "ORBIT_HELIO"
30 #define kFdsProductCompGEO "ORBIT_GEO"
31 #define kNotSpecified "NOT SPECIFIED"
32
33 #define kSVFileSegName "FILENAME"
34
35 #define kSVLineMax 1024
36 #define kMaxHashKey 1024
37
38 /* State-vector data keys */
39 #define kSVKeyPrimary "OBS_DATE"
40 #define kSVKeyXHELIO "HCIEC_X"
41 #define kSVKeyYHELIO "HCIEC_Y"
42 #define kSVKeyZHELIO "HCIEC_Z"
43 #define kSVKeyVxHELIO "HCIEC_VX"
44 #define kSVKeyVyHELIO "HCIEC_VY"
45 #define kSVKeyVzHELIO "HCIEC_VZ"
46 #define kSVKeyXGEO "GCIEC_X"
47 #define kSVKeyYGEO "GCIEC_Y"
48 #define kSVKeyZGEO "GCIEC_Z"
49 #define kSVKeyVxGEO "GCIEC_VX"
50 #define kSVKeyVyGEO "GCIEC_VY"
51 #define kSVKeyVzGEO "GCIEC_VZ"
52 #define kSVKeyDATE "DATE"
53
54 #define kSVKey_idHELIO "id_HELIO"
55 #define kSVKey_idGEO "id_GEO"
56
57 #define gChunkSize 16384
58 #define gCacheKeySize 128
59
60 #define kTOBSMaxLen 64
61 #define kIDMaxLen 256
62
63 struct VectorNode_struct
64 {
65 char tobs[kTOBSMaxLen];
66 double hcix;
67 double hciy;
68 double hciz;
69 double hcivx;
70 double hcivy;
71 double hcivz;
72 char hciID[kIDMaxLen];
73 double gcix;
74 double gciy;
75 double gciz;
76 double gcivx;
77 double gcivy;
78 double gcivz;
79 char gciID[kIDMaxLen];
80 };
81
82 typedef struct VectorNode_struct VectorNode_t;
83
84 typedef HContainer_t VectorCache_t;
85
86 ModuleArgs_t module_args[] =
87 {
88 {ARG_STRING, "ns", kDefaultNamespace, "working namespace (sdo, sdo_ground, sdo_dev)"},
89 {ARG_STRING, "seriesIn", kDefaultSeriesIn, "name of series containing FDS data"},
90 {ARG_STRING, "timeRange", kNotSpecified, "DRMS time interval encompassing data file dates"},
91 {ARG_STRING, "seriesOut", kDefaultSeriesOut, "name of series in which to save extracted data"},
92 {ARG_STRING, "owner", kNotSpecified, "name of database user who owns output series"},
93 {ARG_FLAG, "c", "0", "create the series only"},
94 {ARG_END}
95 };
96
97 char *module_name = "extractFdsStateV";
98
99 int nice_intro ()
100 {
101 int usage = cmdparams_get_int(&cmdparams, "h", NULL);
102 if (usage)
103 {
104 printf ("Usage:\n\textractFdsStateV [-h] "
105 "[seriesIn=<seriesname>] [timeRange=<timerange>] [seriesOut=<seriesname>]\n"
106 " details are:\n"
107 " -h: help - show this message then exit\n"
108 " <seriesname> - fully qualified series name.\n"
109 " <timerange> - time value range set.\n"
110 " seriesIn defaults to sdo.fds.\n"
111 " timeRange defaults to all records in seriesIn.\n"
112 " seriesOut defaults to sdo.fdsStateVectors.\n"
113 " example - extractFdsStateV seriesIn=su_arta.TestFDSData timeRange=2006.11.20_22:38:00-2006.11.20_22:45:00,2006.11.20_22:52:00. seriesOut=su_arta.TestFDSHelio\n");
114 return(1);
115 }
116 return (0);
117 }
118
119 static VectorCache_t *CreateVectorCache()
120 {
121 VectorCache_t *cache = (VectorCache_t *)malloc(sizeof(VectorCache_t));
122 hcon_init_ext((HContainer_t *)cache, 49999, sizeof(VectorNode_t), kTOBSMaxLen, NULL, NULL);
123 return cache;
124 }
125
126 static void DestroyVectorCache(VectorCache_t **cache)
127 {
128 if (cache)
129 {
130 if (*cache)
131 {
132 hcon_destroy((HContainer_t **)cache);
133 }
134 }
135 }
136
137 static void VCacheCache(VectorCache_t *cache,
138 char *tobs,
139 double *hcix,
140 double *hciy,
141 double *hciz,
142 double *hcivx,
143 double *hcivy,
144 double *hcivz,
145 char *hciID,
146 double *gcix,
147 double *gciy,
148 double *gciz,
149 double *gcivx,
150 double *gcivy,
151 double *gcivz,
152 char *gciID)
153 {
154 if (cache && tobs && *tobs)
155 {
156 #if DEBUG
157 /* the time to alloc WAS growing */
158 TIMER_t *timer = CreateTimer();
159 VectorNode_t *node = (VectorNode_t *)hcon_allocslot_lower(cache, tobs);
160 fprintf(stdout, "alloc slot: %f seconds.\n", GetElapsedTime(timer));
161 DestroyTimer(&timer);
162 #else
163 VectorNode_t *node = (VectorNode_t *)hcon_allocslot_lower(cache, tobs);
164 #endif
165 snprintf(node->tobs, kTOBSMaxLen, "%s", tobs);
166
167 if (hcix)
168 {
169 node->hcix = *hcix;
170 }
171 else
172 {
173 node->hcix = DRMS_MISSING_DOUBLE;
174 }
175 if (hciy)
176 {
177 node->hciy = *hciy;
178 }
179 else
180 {
181 node->hciy = DRMS_MISSING_DOUBLE;
182 }
183 if (hciz)
184 {
185 node->hciz = *hciz;
186 }
187 else
188 {
189 node->hciz = DRMS_MISSING_DOUBLE;
190 }
191 if (hcivx)
192 {
193 node->hcivx = *hcivx;
194 }
195 else
196 {
197 node->hcivx = DRMS_MISSING_DOUBLE;
198 }
199 if (hcivy)
200 {
201 node->hcivy = *hcivy;
202 }
203 else
204 {
205 node->hcivy = DRMS_MISSING_DOUBLE;
206 }
207 if (hcivz)
208 {
209 node->hcivz = *hcivz;
210 }
211 else
212 {
213 node->hcivz = DRMS_MISSING_DOUBLE;
214 }
215
216 if (hciID && *hciID)
217 {
218 snprintf(node->hciID, kIDMaxLen, "%s", hciID);
219 }
220 else
221 {
222 snprintf(node->hciID, kIDMaxLen, "%s", "unknown");
223 }
224
225 if (gcix)
226 {
227 node->gcix = *gcix;
228 }
229 else
230 {
231 node->gcix = DRMS_MISSING_DOUBLE;
232 }
233 if (gciy)
234 {
235 node->gciy = *gciy;
236 }
237 else
238 {
239 node->gciy = DRMS_MISSING_DOUBLE;
240 }
241 if (gciz)
242 {
243 node->gciz = *gciz;
244 }
245 else
246 {
247 node->gciz = DRMS_MISSING_DOUBLE;
248 }
249 if (gcivx)
250 {
251 node->gcivx = *gcivx;
252 }
253 else
254 {
255 node->gcivx = DRMS_MISSING_DOUBLE;
256 }
257 if (gcivy)
258 {
259 node->gcivy = *gcivy;
260 }
261 else
262 {
263 node->gcivy = DRMS_MISSING_DOUBLE;
264 }
265 if (gcivz)
266 {
267 node->gcivz = *gcivz;
268 }
269 else
270 {
271 node->gcivz = DRMS_MISSING_DOUBLE;
272 }
273
274 if (gciID && *gciID)
275 {
276 snprintf(node->gciID, kIDMaxLen, "%s", gciID);
277 }
278 else
279 {
280 snprintf(node->gciID, kIDMaxLen, "%s", "unknown");
281 }
282 }
283 }
284
285 static VectorNode_t *VCacheLookup(VectorCache_t *cache, const char *tstr)
286 {
287 VectorNode_t *node = NULL;
288
289 if (tstr && *tstr)
290 {
291 node = (VectorNode_t *)hcon_lookup_lower(cache, tstr);
292 }
293
294 return node;
295 }
296
297 static DRMS_Keyword_t *AddKey(DRMS_Record_t *prototype,
298 DRMS_Type_t type,
299 const char *name,
300 const char *format,
301 const char *unit,
302 DRMS_RecScopeType_t scope,
303 const char *desc,
304 int intprime,
305 int extprime,
306 int *rank)
307 {
308 DRMS_Keyword_t *tKey = NULL;
309
310 tKey = hcon_allocslot_lower(&(prototype->keywords), name);
311 XASSERT(tKey);
312 memset(tKey, 0, sizeof(DRMS_Keyword_t));
313 tKey->info = malloc(sizeof(DRMS_KeywordInfo_t));
314 XASSERT(tKey->info);
315 memset(tKey->info, 0, sizeof(DRMS_KeywordInfo_t));
316
317 /* XXX check for invalid input */
318
319 if (tKey && tKey->info)
320 {
321 /* record */
322 tKey->record = prototype;
323
324 /* keyword info */
325 snprintf(tKey->info->name,
326 DRMS_MAXKEYNAMELEN,
327 "%s",
328 name);
329 tKey->info->type = type;
330 snprintf(tKey->info->format, DRMS_MAXFORMATLEN, "%s", format);
331 snprintf(tKey->info->unit, DRMS_MAXUNITLEN, "%s", unit);
332 tKey->info->recscope = scope;
333
334 if (intprime)
335 {
336 drms_keyword_setintprime(tKey);
337 }
338 else
339 {
340 drms_keyword_unsetintprime(tKey);
341 }
342
343 if (extprime)
344 {
345 drms_keyword_setextprime(tKey);
346 }
347 else
348 {
349 drms_keyword_unsetextprime(tKey);
350 }
351
352 snprintf(tKey->info->description, DRMS_MAXCOMMENTLEN, "%s", desc);
353
354 /* default value - missing */
355 drms_missing(type, &(tKey->value));
356
357 /* Set rank - both in rank field and in flags (so that it gets saved into dbase) */
358 tKey->info->rank = (*rank)++;
359 tKey->info->kwflags |= (tKey->info->rank + 1) << 16;
360 }
361
362 return tKey;
363 }
364
365 static void CreateOutSeries(DRMS_Env_t *drmsEnv, char *outSeries, const char *owner, int *status)
366 {
367 int stat = DRMS_SUCCESS;
368 int rank;
369
370 if (!drms_series_exists(drmsEnv, outSeries, &stat))
371 {
372 DRMS_Record_t *prototype = (DRMS_Record_t *)calloc(1, sizeof(DRMS_Record_t));
373
374 if (prototype)
375 {
376 prototype->seriesinfo = calloc(1, sizeof(DRMS_SeriesInfo_t));
377
378 if (prototype->seriesinfo)
379 {
380 DRMS_Keyword_t *pkey = NULL;
381 DRMS_Keyword_t *akey = NULL;
382 char keyname[DRMS_MAXKEYNAMELEN];
383
384 prototype->env = drmsEnv;
385 prototype->recnum = 0;
386 prototype->sunum = -1;
387 prototype->init = 1;
388 prototype->sessionid = 0;
389 prototype->sessionns = NULL;
390 prototype->su = NULL;
391
392 /* Initialize container structure. */
393 hcon_init(&prototype->segments, sizeof(DRMS_Segment_t), DRMS_MAXHASHKEYLEN,
394 (void (*)(const void *)) drms_free_segment_struct,
395 (void (*)(const void *, const void *)) drms_copy_segment_struct);
396 /* Initialize container structures for links. */
397 hcon_init(&prototype->links, sizeof(DRMS_Link_t), DRMS_MAXHASHKEYLEN,
398 (void (*)(const void *)) drms_free_link_struct,
399 (void (*)(const void *, const void *)) drms_copy_link_struct);
400 /* Initialize container structure. */
401 hcon_init(&prototype->keywords, sizeof(DRMS_Keyword_t), DRMS_MAXHASHKEYLEN,
402 (void (*)(const void *)) drms_free_keyword_struct,
403 (void (*)(const void *, const void *)) drms_copy_keyword_struct);
404
405 /* series info */
406 snprintf(prototype->seriesinfo->seriesname,
407 DRMS_MAXSERIESNAMELEN,
408 "%s",
409 outSeries);
410
411 strcpy(prototype->seriesinfo->author, "Art Amezcua");
412
413 /* discard "Owner", fill it with the dbuser */
414 if (drmsEnv->session->db_direct)
415 {
416 strcpy(prototype->seriesinfo->owner, drmsEnv->session->db_handle->dbuser);
417 }
418 else
419 {
420 if (owner && strlen(owner) < DRMS_MAXCOMMENTLEN)
421 {
422 strcpy(prototype->seriesinfo->owner, owner);
423 }
424 else
425 {
426 strcpy(prototype->seriesinfo->owner, "unknown");
427 }
428 }
429
430 prototype->seriesinfo->unitsize = 0;
431 prototype->seriesinfo->archive = 0;
432 prototype->seriesinfo->retention = 0;
433 prototype->seriesinfo->tapegroup = 0;
434
435 snprintf(prototype->seriesinfo->description,
436 DRMS_MAXCOMMENTLEN,
437 "%s",
438 "Helio- and geo-centric orbit position and velocity vectors.");
439
440 /* slotted keyword isdrmsprime, but not really prime */
441 rank = 0;
442 akey = AddKey(prototype, DRMS_TYPE_TIME, kSVKeyPrimary, "0", "UTC", kRecScopeType_TS_EQ, "Date of prediction", 0, 1, &rank);
443
444 snprintf(keyname, sizeof(keyname), "%s_%s", kSVKeyPrimary, "index");
445 pkey = AddKey(prototype, kIndexKWType, keyname, kIndexKWFormat, "none", kRecScopeType_Index, "Slotted-key index for OBS_DATE", 1, 0, &rank);
446 drms_keyword_setimplicit(pkey);
447
448 /* epoch */
449 snprintf(keyname, sizeof(keyname), "%s_%s", kSVKeyPrimary, "epoch");
450 akey = AddKey(prototype, DRMS_TYPE_TIME, keyname, "0", "UTC", kRecScopeType_Constant, "MDI epoch - adjusted by 30 seconds to center slots on minutes", 0, 0, &rank);
451 akey->value.time_val = sscan_time("1993.01.01_00:00:30_UT");
452
453 /* step */
454 snprintf(keyname, sizeof(keyname), "%s_%s", kSVKeyPrimary, "step");
455 akey = AddKey(prototype, DRMS_TYPE_TIME, keyname, "%f", "secs", kRecScopeType_Constant, "Slots are 60 seconds wide", 0, 0, &rank);
456 akey->value.time_val = 60.0;
457
458 AddKey(prototype, DRMS_TYPE_DOUBLE, kSVKeyXHELIO, "%f", "km", kRecScopeType_Variable, "X position", 0, 0, &rank);
459 AddKey(prototype, DRMS_TYPE_DOUBLE, kSVKeyYHELIO, "%f", "km", kRecScopeType_Variable, "Y position", 0, 0, &rank);
460 AddKey(prototype, DRMS_TYPE_DOUBLE, kSVKeyZHELIO, "%f", "km", kRecScopeType_Variable, "Z position", 0, 0, &rank);
461 AddKey(prototype, DRMS_TYPE_DOUBLE, kSVKeyVxHELIO, "%f", "km/sec", kRecScopeType_Variable, "Velocity in the X direction", 0, 0, &rank);
462 AddKey(prototype, DRMS_TYPE_DOUBLE, kSVKeyVyHELIO, "%f", "km/sec", kRecScopeType_Variable, "Velocity in the Y direction", 0, 0, &rank);
463 AddKey(prototype, DRMS_TYPE_DOUBLE, kSVKeyVzHELIO, "%f", "km/sec", kRecScopeType_Variable, "Velocity in the Z direction", 0, 0, &rank);
464
465 akey = AddKey(prototype, DRMS_TYPE_STRING, kSVKey_idHELIO, "%s", "NA", kRecScopeType_Variable, "Record query to identify source file containing heliocentric data", 0, 0, &rank);
466 copy_string(&(akey->value.string_val), "Unknown");
467
468 AddKey(prototype, DRMS_TYPE_DOUBLE, kSVKeyXGEO, "%f", "km", kRecScopeType_Variable, "X position", 0, 0, &rank);
469 AddKey(prototype, DRMS_TYPE_DOUBLE, kSVKeyYGEO, "%f", "km", kRecScopeType_Variable, "Y position", 0, 0, &rank);
470 AddKey(prototype, DRMS_TYPE_DOUBLE, kSVKeyZGEO, "%f", "km", kRecScopeType_Variable, "Z position", 0, 0, &rank);
471 AddKey(prototype, DRMS_TYPE_DOUBLE, kSVKeyVxGEO, "%f", "km/sec", kRecScopeType_Variable, "Velocity in the X direction", 0, 0, &rank);
472 AddKey(prototype, DRMS_TYPE_DOUBLE, kSVKeyVyGEO, "%f", "km/sec", kRecScopeType_Variable, "Velocity in the Y direction", 0, 0, &rank);
473 AddKey(prototype, DRMS_TYPE_DOUBLE, kSVKeyVzGEO, "%f", "km/sec", kRecScopeType_Variable, "Velocity in the Z direction", 0, 0, &rank);
474
475 akey = AddKey(prototype, DRMS_TYPE_STRING, kSVKey_idGEO, "%s", "NA", kRecScopeType_Variable, "Record query to identify source file containing geocentric data", 0, 0, &rank);
476 copy_string(&(akey->value.string_val), "Unknown");
477
478 /* DATE keyword that indicates date of record creation. */
479 AddKey(prototype, DRMS_TYPE_TIME, kSVKeyDATE, "0", "ISO", kRecScopeType_Variable, "Date of orbit-data ingestion; ISO 86\
480 01", 0, 0, &rank);
481
482 prototype->seriesinfo->pidx_keywords[0] = pkey;
483 prototype->seriesinfo->pidx_num = 1;
484 }
485
486 stat = drms_create_series_fromprototype(&prototype, outSeries, 0);
487 }
488 }
489
490 if (status)
491 {
492 *status = stat;
493 }
494 }
495
496 static void CreateHistSeries(DRMS_Env_t *drmsEnv, char *histSeries, const char *owner, int *status)
497 {
498 int stat = DRMS_SUCCESS;
499 int rank;
500
501 if (!drms_series_exists(drmsEnv, histSeries, &stat))
502 {
503 DRMS_Record_t *prototype = (DRMS_Record_t *)calloc(1, sizeof(DRMS_Record_t));
504
505 if (prototype)
506 {
507 prototype->seriesinfo = calloc(1, sizeof(DRMS_SeriesInfo_t));
508
509 if (prototype->seriesinfo)
510 {
511 DRMS_Keyword_t *pkey1 = NULL;
512 DRMS_Keyword_t *pkey2 = NULL;
513 DRMS_Keyword_t *pkey3 = NULL;
514 DRMS_Keyword_t *pkey4 = NULL;
515 DRMS_Keyword_t *akey = NULL;
516 char keyname[DRMS_MAXKEYNAMELEN];
517
518 prototype->env = drmsEnv;
519 prototype->recnum = 0;
520 prototype->sunum = -1;
521 prototype->init = 1;
522 prototype->sessionid = 0;
523 prototype->sessionns = NULL;
524 prototype->su = NULL;
525
526 /* Initialize container structure. */
527 hcon_init(&prototype->segments, sizeof(DRMS_Segment_t), DRMS_MAXHASHKEYLEN,
528 (void (*)(const void *)) drms_free_segment_struct,
529 (void (*)(const void *, const void *)) drms_copy_segment_struct);
530 /* Initialize container structures for links. */
531 hcon_init(&prototype->links, sizeof(DRMS_Link_t), DRMS_MAXHASHKEYLEN,
532 (void (*)(const void *)) drms_free_link_struct,
533 (void (*)(const void *, const void *)) drms_copy_link_struct);
534 /* Initialize container structure. */
535 hcon_init(&prototype->keywords, sizeof(DRMS_Keyword_t), DRMS_MAXHASHKEYLEN,
536 (void (*)(const void *)) drms_free_keyword_struct,
537 (void (*)(const void *, const void *)) drms_copy_keyword_struct);
538
539 /* series info */
540 snprintf(prototype->seriesinfo->seriesname,
541 DRMS_MAXSERIESNAMELEN,
542 "%s",
543 histSeries);
544
545 strcpy(prototype->seriesinfo->author, "Art Amezcua");
546
547 /* discard "Owner", fill it with the dbuser */
548 if (drmsEnv->session->db_direct)
549 {
550 strcpy(prototype->seriesinfo->owner, drmsEnv->session->db_handle->dbuser);
551 }
552 else
553 {
554 if (owner && strlen(owner) < DRMS_MAXCOMMENTLEN)
555 {
556 strcpy(prototype->seriesinfo->owner, owner);
557 }
558 else
559 {
560 strcpy(prototype->seriesinfo->owner, "unknown");
561 }
562 }
563
564 prototype->seriesinfo->unitsize = 0;
565 prototype->seriesinfo->archive = 0;
566 prototype->seriesinfo->retention = 0;
567 prototype->seriesinfo->tapegroup = 0;
568
569 snprintf(prototype->seriesinfo->description,
570 DRMS_MAXCOMMENTLEN,
571 "%s",
572 "Ingestion history of helio- and geo-centric orbit prediction data.");
573
574 /* slotted keyword isdrmsprime, but not really prime */
575 rank = 0;
576 akey = AddKey(prototype, DRMS_TYPE_TIME, kObsDateKey, "0", "UTC", kRecScopeType_TS_EQ, "Date embedded in orbit file name", 0, 1, &rank);
577
578 snprintf(keyname, sizeof(keyname), "%s_%s", kSVKeyPrimary, "index");
579 pkey1 = AddKey(prototype, kIndexKWType, keyname, kIndexKWFormat, "none", kRecScopeType_Index, "Slotted-key index for OBS_DATE", 1, 0, &rank);
580 drms_keyword_setimplicit(pkey1);
581
582 /* epoch */
583 snprintf(keyname, sizeof(keyname), "%s_%s", kSVKeyPrimary, "epoch");
584 akey = AddKey(prototype, DRMS_TYPE_TIME, keyname, "0", "UTC", kRecScopeType_Constant, "MDI epoch - adjusted by 12 hours to center slots on days.", 0, 0, &rank);
585 akey->value.time_val = sscan_time("1993.01.01_12:00:00_UT");
586
587 /* step */
588 snprintf(keyname, sizeof(keyname), "%s_%s", kSVKeyPrimary, "step");
589 akey = AddKey(prototype, DRMS_TYPE_TIME, keyname, "%f", "day", kRecScopeType_Constant, "Slots are 1 day wide", 0, 0, &rank);
590 akey->value.time_val = 86400.0;
591
592 pkey2 = AddKey(prototype, DRMS_TYPE_STRING, kFdsProductCompKey, "%s", "none", kRecScopeType_Variable, "FDS data-product component", 1, 1, &rank);
593 pkey3 = AddKey(prototype, DRMS_TYPE_STRING, kFdsDataFormatKey, "%s", "none", kRecScopeType_Variable, "Format of data file contained in segment", 1, 1, &rank);
594 pkey4 = AddKey(prototype, DRMS_TYPE_INT, kFileVersionKey, "%d", "none", kRecScopeType_Variable, "FDS data product", 1, 1, &rank);
595
596 prototype->seriesinfo->pidx_keywords[0] = pkey1;
597 prototype->seriesinfo->pidx_keywords[1] = pkey2;
598 prototype->seriesinfo->pidx_keywords[2] = pkey3;
599 prototype->seriesinfo->pidx_keywords[3] = pkey4;
600 prototype->seriesinfo->pidx_num = 4;
601 }
602
603 stat = drms_create_series_fromprototype(&prototype, histSeries, 0);
604 }
605 }
606
607 if (status)
608 {
609 *status = stat;
610 }
611 }
612
613 static void CloseCachedRecords(HContainer_t **cache)
614 {
615 if (cache && *cache)
616 {
617 DRMS_RecordSet_t *rs = *((DRMS_RecordSet_t **)hcon_lookup(*cache, "bobmould"));
618 drms_close_records(rs, DRMS_FREE_RECORD);
619 rs = NULL;
620
621 /* destory cache */
622 hcon_destroy(cache);
623 }
624 }
625
626 /* This uses record-chunking to find the correct record. Assumes records are in increasing time order,
627 * and calls to this function have increaing tbuf */
628 /* Caller must clean up records by calling CloseCachedRecords() */
629 static int FetchCachedRecord(DRMS_Env_t *drmsEnv,
630 char *series,
631 char *tbuf,
632 HContainer_t **cache,
633 DRMS_Record_t **recout)
634 {
635 int error = 0;
636 DRMS_RecordSet_t *rs = NULL;
637 int stop;
638 int rehydrated;
639 DRMS_Record_t **prec = NULL;
640 DRMS_Record_t *rec = NULL;
641 char *timestr = NULL;
642 int status = DRMS_SUCCESS;
643 DRMS_RecChunking_t cstat = kRecChunking_None;
644 int nrecs;
645
646 if (!cache || !recout)
647 {
648 error = 1;
649 }
650 else
651 {
652 *recout = NULL;
653
654 if (*cache == NULL)
655 {
656 drms_recordset_setchunksize(gChunkSize);
657 rs = drms_open_recordset(drmsEnv, series, &status);
658
659 if (status != DRMS_SUCCESS || rs == NULL)
660 {
661 fprintf(stderr, "drms_open_recordset() failed, query=%s, error=%d. Aborting.\n", series, status);
662 }
663 else
664 {
665 nrecs = drms_count_records(drmsEnv, series, &status);
666
667 if (status != DRMS_SUCCESS)
668 {
669 fprintf(stderr, "drms_count_records() failed, query=%s, error=%d. Aborting.\n", series, status);
670 }
671 }
672 }
673 /* attempt to find the record that matches tbuf */
674 else
675 {
676 /* retrieve saved rs */
677 rs = *((DRMS_RecordSet_t **)hcon_lookup(*cache, "bobmould"));
678
679 if (nrecs > 0)
680 {
681 if ((prec = (DRMS_Record_t **)hcon_lookup(*cache, tbuf)) != NULL)
682 {
683 *recout = *prec;
684 }
685 }
686 }
687
688 if (nrecs > 0)
689 {
690 /* Have to loop on ALL chunks if can't find time in cache */
691 int morerecs = 1;
692 int newchunk;
693
694 while (!prec && !error && morerecs)
695 {
696 /* either no cache, or cache, but miss - get more records, then try to find tbuf */
697 stop = 0;
698 rehydrated = 0;
699
700 /* must not call fetchnext() if previous call retrieved last rec in chunk, otherwise
701 * fetchnext() will blow away chunk, but the *cache will still point to recs
702 * in the cache. */
703 newchunk = 0;
704 while (!stop && (rec = drms_recordset_fetchnext(drmsEnv, rs, &status, &cstat, &newchunk)) != NULL)
705 {
706 if (newchunk)
707 {
708 if (*cache)
709 {
710 hcon_destroy(cache);
711 }
712
713 *cache = hcon_create(sizeof(DRMS_Record_t *), gCacheKeySize, NULL, NULL, NULL, NULL, 0);
714
715 /* insert recordset */
716 hcon_insert(*cache, "bobmould", &rs);
717 }
718
719 if (cstat == kRecChunking_LastInChunk || cstat == kRecChunking_LastInRS)
720 {
721 /* this record was the last in chunk - stop caching */
722 stop = 1;
723 }
724
725 /* insert, using the obs_date keyword value */
726 timestr = drms_getkey_string(rec, kObsDateKey, &status);
727 if (timestr)
728 {
729 hcon_insert(*cache, timestr, &rec);
730 rehydrated = 1;
731 }
732 else
733 {
734 error = 1;
735 break;
736 }
737 }
738
739 if (cstat == kRecChunking_NoMoreRecs)
740 {
741 morerecs = 0;
742 }
743
744 /* Try to find tbuf again */
745 if (rehydrated)
746 {
747 if ((prec = (DRMS_Record_t **)hcon_lookup(*cache, tbuf)) != NULL)
748 {
749 *recout = *prec;
750 }
751 }
752 }
753 }
754 else if (*cache == NULL)
755 {
756 *cache = hcon_create(sizeof(DRMS_Record_t *), gCacheKeySize, NULL, NULL, NULL, NULL, 0);
757
758 /* insert recordset */
759 hcon_insert(*cache, "bobmould", &rs);
760 }
761 }
762
763 #if DEBUG
764 if (!prec)
765 {
766 fprintf(stderr, "Record for time '%s' not found in series '%s'.\n", tbuf, series);
767 }
768 #endif
769
770 return error;
771 }
772
773 static int ParseSVRecFields(char *recBuf, char **date, double *xVal, double *yVal, double *zVal,
774 double *vxVal, double *vyVal, double *vzVal)
775 {
776 int error = 0;
777 char *token;
778 char *line = strdup(recBuf);
779
780 if (line != NULL)
781 {
782 token = strtok(line, " ");
783
784 if (token != NULL)
785 {
786 /* Must convert date to something that drms recognizes (calendar date) */
787 char year[8];
788 char day[8];
789 char hour[8];
790 char minute[8];
791 char second[8];
792
793 strncpy(year, token, 4);
794 year[4] = '\0';
795 strncpy(day, &token[4], 3);
796 day[3] = '\0';
797 strncpy(hour, &token[8], 2);
798 hour[2] = '\0';
799 strncpy(minute, &token[10], 2);
800 minute[2] = '\0';
801 strncpy(second, &token[12], 2);
802 second[2] = '\0';
803
804 char timeBuf[64];
805 snprintf(timeBuf, sizeof(timeBuf), "%s.01.%s_%s:%s:%s_UT", year, day, hour, minute, second);
806
807 *date = strdup(timeBuf);
808
809 if ((token = strtok(NULL, " ")) != NULL)
810 {
811 sscanf(token, "%lf", xVal);
812 }
813 else
814 {
815 error = 1;
816 }
817
818 if (error == 0)
819 {
820 if ((token = strtok(NULL, " ")) != NULL)
821 {
822 sscanf(token, "%lf", yVal);
823 }
824 else
825 {
826 error = 1;
827 }
828 }
829
830 if (error == 0)
831 {
832 if((token = strtok(NULL, " ")) != NULL)
833 {
834 sscanf(token, "%lf", zVal);
835 }
836 else
837 {
838 error = 1;
839 }
840 }
841
842 if (error == 0)
843 {
844 if ((token = strtok(NULL, " ")) != NULL)
845 {
846 sscanf(token, "%lf", vxVal);
847 }
848 else
849 {
850 error = 1;
851 }
852 }
853
854 if (error == 0)
855 {
856 if ((token = strtok(NULL, " ")) != NULL)
857 {
858 sscanf(token, "%lf", vyVal);
859 }
860 else
861 {
862 error = 1;
863 }
864 }
865
866 if (error == 0)
867 {
868 if ((token = strtok(NULL, " ")) != NULL)
869 {
870 sscanf(token, "%lf", vzVal);
871 }
872 else
873 {
874 error = 1;
875 }
876 }
877 }
878 }
879 else
880 {
881 error = 1;
882 }
883
884 return error;
885 }
886
887 static int IsDiff(double v1, double v2)
888 {
889 if (!drms_ismissing_double(v1) && !drms_ismissing_double(v2))
890 {
891 return (fabs(v1 - v2) > 1.0e-11 * (fabs(v1) + fabs(v2)));
892 }
893 else if (!drms_ismissing_double(v1) || !drms_ismissing_double(v2))
894 {
895 return 1;
896 }
897
898 return 0;
899 }
900
901 static int ExtractStateVectors(DRMS_Env_t *drmsEnv,
902 char *filePathHELIO,
903 char *idHELIO,
904 char *filePathGEO,
905 char *idGEO,
906 char *outSeries,
907 const char *owner)
908 {
909 int stat = DRMS_SUCCESS;
910 int error = 0;
911 char *obsDate = NULL;
912 double xValHel;
913 double yValHel;
914 double zValHel;
915 double vxValHel;
916 double vyValHel;
917 double vzValHel;
918 double xValGeo;
919 double yValGeo;
920 double zValGeo;
921 double vxValGeo;
922 double vyValGeo;
923 double vzValGeo;
924
925 int addedRecsHELIO = 0;
926 int addedRecsGEO = 0;
927
928 HContainer_t *existRCache = NULL;
929 DRMS_Record_t *existRec = NULL;
930 VectorCache_t *helioOutVCache = NULL;
931 VectorCache_t *outVCache = NULL;
932
933 outVCache = CreateVectorCache();
934
935 #if DEBUG
936 Hash_Table_t hash;
937 hash_init(&hash, 49999, 0, (int (*)(const void *, const void *))strcmp, hash_universal_hash);
938 #endif
939
940 #if DEBUG
941 int throttle = 0;
942 #endif
943
944 /* Read heliocentric data from filePathHELIO one line at a time */
945 FILE *datafp = NULL;
946
947 if (filePathHELIO);
948 {
949 datafp = fopen(filePathHELIO, "r");
950 if (datafp == NULL)
951 {
952 error = 1;
953 fprintf(stderr, "Could not open %s for reading\n", filePathHELIO);
954 }
955 }
956
957 if (datafp)
958 {
959 char lineBuf[kSVLineMax];
960 int oneMore = -1;
961
962 CreateOutSeries(drmsEnv, outSeries, owner, &stat);
963
964 if (filePathGEO)
965 {
966 /* If we are ingesting both geo and helio data, then we need to
967 * pair up the two into records (pair according to prime key
968 * value). Put these heliocentric data into a cache.
969 * and then when the geocentric data are available
970 * fetch from the cache and match them up. */
971 //snprintf(sbox, sizeof(sbox), "%s_sbox", outSeries);
972 //CreateOutSeries(drmsEnv, sbox, &stat);
973 helioOutVCache = CreateVectorCache();
974 }
975
976 /* Parsing HELIO data. */
977 #if DEBUG
978 TIMER_t *timer = CreateTimer();
979 int nitems = 0;
980 #endif
981
982 while (!error && fgets(lineBuf, kSVLineMax, datafp) != NULL)
983 {
984 if (oneMore == -1)
985 {
986 if (strlen(lineBuf) >= 4)
987 {
988 if (strncmp(lineBuf, "Time", 4) == 0)
989 {
990 oneMore = 1;
991 }
992 }
993
994 continue;
995 }
996
997 if (oneMore > 0)
998 {
999 oneMore--;
1000 continue;
1001 }
1002
1003 ParseSVRecFields(lineBuf,
1004 &obsDate,
1005 &xValHel,
1006 &yValHel,
1007 &zValHel,
1008 &vxValHel,
1009 &vyValHel,
1010 &vzValHel);
1011
1012 TIME od = sscan_time(obsDate);
1013 char tbuf[128];
1014 sprint_time(tbuf, od, "UTC", 0);
1015
1016 if (helioOutVCache)
1017 {
1018 #if DEBUG
1019 if (nitems < 10000)
1020 {
1021 #endif
1022
1023 #if DEBUG
1024 /* try just hashing, and don't worry about leaking the key */
1025 char *buff = malloc(64);
1026 snprintf(buff, 64, "test%d", nitems);
1027 TIMER_t *timer10 = CreateTimer();
1028 hash_insert(&hash, buff, (void *)(buff));
1029 hash_lookup(&hash, buff);
1030 fprintf(stdout, "hash insert + lookup %f seconds.\n", GetElapsedTime(timer10));
1031 DestroyTimer(&timer10);
1032 #else
1033 VCacheCache(helioOutVCache,
1034 tbuf,
1035 &xValHel,
1036 &yValHel,
1037 &zValHel,
1038 &vxValHel,
1039 &vyValHel,
1040 &vzValHel,
1041 idHELIO,
1042 NULL,
1043 NULL,
1044 NULL,
1045 NULL,
1046 NULL,
1047 NULL,
1048 NULL);
1049 #endif
1050
1051 #if DEBUG
1052 }
1053 #endif
1054 }
1055 else
1056 {
1057 /* Before creating a new output record, check to see if data have changed. There
1058 * exist HELIO input data only. */
1059
1060 /* Use FetchCachedRecord() directly on the orbit-vector series -
1061 * this avoids multiple requests to psql; must request times in
1062 * ascending order to use FetchCachedRecord(). */
1063
1064 /* The first time this call is made, a db cursor is created. There is
1065 * no need to fetch records older than the time string in tbuf (which
1066 * corresponds to the time - double - in od), provided that tbuf
1067 * is monotonically increasing in this loop. Provide a filter that
1068 * limits the number of DRMS records cached to speed things up. */
1069 char filtered[DRMS_MAXQUERYLEN];
1070 snprintf(filtered, sizeof(filtered), "%s[? %s >= $(%s) ?]", outSeries, kObsDateKey, tbuf);
1071
1072 if (FetchCachedRecord(drmsEnv, filtered, tbuf, &existRCache, &existRec))
1073 {
1074 double xValHelSav = drms_getkey_double(existRec, kSVKeyXHELIO, &stat);
1075 double yValHelSav = drms_getkey_double(existRec, kSVKeyYHELIO, &stat);
1076 double zValHelSav = drms_getkey_double(existRec, kSVKeyZHELIO, &stat);
1077 double vxValHelSav = drms_getkey_double(existRec, kSVKeyVxHELIO, &stat);
1078 double vyValHelSav = drms_getkey_double(existRec, kSVKeyVyHELIO, &stat);
1079 double vzValHelSav = drms_getkey_double(existRec, kSVKeyVzHELIO, &stat);
1080
1081 char *idh = drms_getkey_string(existRec, kSVKey_idHELIO, &stat);
1082
1083 double xValGeoSav = drms_getkey_double(existRec, kSVKeyXGEO, &stat);
1084 double yValGeoSav = drms_getkey_double(existRec, kSVKeyYGEO, &stat);
1085 double zValGeoSav = drms_getkey_double(existRec, kSVKeyZGEO, &stat);
1086 double vxValGeoSav = drms_getkey_double(existRec, kSVKeyVxGEO, &stat);
1087 double vyValGeoSav = drms_getkey_double(existRec, kSVKeyVyGEO, &stat);
1088 double vzValGeoSav = drms_getkey_double(existRec, kSVKeyVzGEO, &stat);
1089
1090 char *id = drms_getkey_string(existRec, kSVKey_idGEO, &stat);
1091
1092 if (IsDiff(xValHelSav, xValHel) || IsDiff(yValHelSav, yValHel) ||
1093 IsDiff(zValHelSav, zValHel) || IsDiff(vxValHelSav, vxValHel) ||
1094 IsDiff(vyValHelSav, vyValHel) || IsDiff(vzValHelSav, vzValHel))
1095 {
1096 /* Difference in HELIO values exists - make record with new helio
1097 * values, but old geo values. */
1098 VCacheCache(outVCache,
1099 tbuf,
1100 &xValHel,
1101 &yValHel,
1102 &zValHel,
1103 &vxValHel,
1104 &vyValHel,
1105 &vzValHel,
1106 idHELIO,
1107 &xValGeoSav,
1108 &yValGeoSav,
1109 &zValGeoSav,
1110 &vxValGeoSav,
1111 &vyValGeoSav,
1112 &vzValGeoSav,
1113 id);
1114 }
1115
1116 if (idh)
1117 {
1118 free(idh);
1119 }
1120
1121 if (id)
1122 {
1123 free(id);
1124 }
1125 }
1126 else
1127 {
1128 /* No geo input file, and no previously existing record in outSeries -
1129 * add helio data. */
1130 VCacheCache(outVCache,
1131 tbuf,
1132 &xValHel,
1133 &yValHel,
1134 &zValHel,
1135 &vxValHel,
1136 &vyValHel,
1137 &vzValHel,
1138 idHELIO,
1139 NULL,
1140 NULL,
1141 NULL,
1142 NULL,
1143 NULL,
1144 NULL,
1145 NULL);
1146 }
1147 }
1148
1149 if (!filePathGEO)
1150 {
1151 addedRecsHELIO++;
1152 }
1153
1154 #if DEBUG
1155 nitems++;
1156 #endif
1157
1158 #if DEBUG
1159 throttle++;
1160 #endif
1161
1162 #if DEBUG
1163 /* Just to speed things up during debugging */
1164 if (throttle == 10)
1165 {
1166 break;
1167 }
1168 #endif
1169
1170 } /* end while */
1171
1172 #if DEBUG
1173 fprintf(stdout, "Helio ingest time: %f seconds.\n", GetElapsedTime(timer));
1174 DestroyTimer(&timer);
1175 #endif
1176
1177 fclose(datafp);
1178 datafp = NULL;
1179 } /* end helio-file processing */
1180
1181 /* Cannot use existing existRCache since it was populated from times
1182 * from the helio data file, and the geo data file may have different
1183 * times. */
1184 CloseCachedRecords(&existRCache);
1185 existRec = NULL;
1186
1187 /* Geocentric orbit files - save these data in the real series, along with the heliocentric
1188 * data from sbox series. */
1189 if (error == 0);
1190 {
1191 if (filePathGEO)
1192 {
1193 datafp = fopen(filePathGEO, "r");
1194 if (datafp == NULL)
1195 {
1196 error = 1;
1197 fprintf(stderr, "Could not open %s for reading\n", filePathGEO);
1198 }
1199 }
1200 }
1201
1202 if (datafp)
1203 {
1204 char lineBuf[kSVLineMax];
1205 int oneMore = -1;
1206 int helioexist;
1207
1208 #if DEBUG
1209 TIMER_t *timer = CreateTimer();
1210 #endif
1211
1212 while (!error && fgets(lineBuf, kSVLineMax, datafp) != NULL)
1213 {
1214 if (oneMore == -1)
1215 {
1216 if (strlen(lineBuf) >= 4)
1217 {
1218 if (strncmp(lineBuf, "Time", 4) == 0)
1219 {
1220 oneMore = 1;
1221 }
1222 }
1223
1224 continue;
1225 }
1226
1227 if (oneMore > 0)
1228 {
1229 oneMore--;
1230 continue;
1231 }
1232
1233 ParseSVRecFields(lineBuf,
1234 &obsDate,
1235 &xValGeo,
1236 &yValGeo,
1237 &zValGeo,
1238 &vxValGeo,
1239 &vyValGeo,
1240 &vzValGeo);
1241
1242 /* Find corresponding heliocentric data, if they exist */
1243 helioexist = 0;
1244 char *idHELIOtmp = NULL;
1245 VectorNode_t *hrec = NULL;
1246
1247 TIME od = sscan_time(obsDate);
1248 char tbuf[128];
1249 sprint_time(tbuf, od, "UTC", 0);
1250
1251 if (helioOutVCache)
1252 {
1253 hrec = VCacheLookup(helioOutVCache, tbuf);
1254 }
1255
1256 /* Get heliocentric data, if they are also being ingested */
1257 if (hrec)
1258 {
1259 xValHel = hrec->hcix;
1260 yValHel = hrec->hciy;
1261 zValHel = hrec->hciz;
1262 vxValHel = hrec->hcivx;
1263 vyValHel = hrec->hcivy;
1264 vzValHel = hrec->hcivz;
1265 idHELIOtmp = hrec->hciID;
1266 helioexist = 1;
1267 }
1268
1269 /* Before creating a new output record, check to see if data have changed.
1270 * There is GEO input, and there is HELIO input if helioexist == 1. */
1271
1272 // DRMS_RecordSet_t *rssav = drms_open_records(drmsEnv, query, &stat);
1273 /* Use new record-chunking way. */
1274 //DRMS_RecordSet_t *rssav = drms_open_recordset(drmsEnv, query, &stat);
1275 //FetchCachedRecord(drmsEnv, outSeries, tbuf, &orbitcache, &savrec);
1276
1277
1278 /* The first time this call is made, a db cursor is created. There is
1279 * no need to fetch records older than the time string in tbuf (which
1280 * corresponds to the time - double - in od), provided that tbuf
1281 * is monotonically increasing in this loop. Provide a filter that
1282 * limits the number of DRMS records cached to speed things up. */
1283 char filtered[DRMS_MAXQUERYLEN];
1284 snprintf(filtered, sizeof(filtered), "%s[? %s >= $(%s) ?]", outSeries, kObsDateKey, tbuf);
1285
1286 if (FetchCachedRecord(drmsEnv, filtered, tbuf, &existRCache, &existRec))
1287 {
1288 int heliodiff = 0;
1289 int geodiff = 0;
1290
1291 double xValHelSav = drms_getkey_double(existRec, kSVKeyXHELIO, &stat);
1292 double yValHelSav = drms_getkey_double(existRec, kSVKeyYHELIO, &stat);
1293 double zValHelSav = drms_getkey_double(existRec, kSVKeyZHELIO, &stat);
1294 double vxValHelSav = drms_getkey_double(existRec, kSVKeyVxHELIO, &stat);
1295 double vyValHelSav = drms_getkey_double(existRec, kSVKeyVyHELIO, &stat);
1296 double vzValHelSav = drms_getkey_double(existRec, kSVKeyVzHELIO, &stat);
1297
1298 char *id = drms_getkey_string(existRec, kSVKey_idHELIO, &stat);
1299
1300 double xValGeoSav = drms_getkey_double(existRec, kSVKeyXGEO, &stat);
1301 double yValGeoSav = drms_getkey_double(existRec, kSVKeyYGEO, &stat);
1302 double zValGeoSav = drms_getkey_double(existRec, kSVKeyZGEO, &stat);
1303 double vxValGeoSav = drms_getkey_double(existRec, kSVKeyVxGEO, &stat);
1304 double vyValGeoSav = drms_getkey_double(existRec, kSVKeyVyGEO, &stat);
1305 double vzValGeoSav = drms_getkey_double(existRec, kSVKeyVzGEO, &stat);
1306
1307 char *idg = drms_getkey_string(existRec, kSVKey_idGEO, &stat);
1308
1309 /* Some of these HELIO and GEO data could be missing. */
1310 geodiff = (IsDiff(xValGeoSav, xValGeo) || IsDiff(yValGeoSav, yValGeo) ||
1311 IsDiff(zValGeoSav, zValGeo) || IsDiff(vxValGeoSav, vxValGeo) ||
1312 IsDiff(vyValGeoSav, vyValGeo) || IsDiff(vzValGeoSav, vzValGeo));
1313
1314 if (helioexist)
1315 {
1316 /* compare existRec to content of filePathHELIO */
1317 heliodiff = (IsDiff(xValHelSav, xValHel) || IsDiff(yValHelSav, yValHel) ||
1318 IsDiff(zValHelSav, zValHel) || IsDiff(vxValHelSav, vxValHel) ||
1319 IsDiff(vyValHelSav, vyValHel) || IsDiff(vzValHelSav, vzValHel));
1320 }
1321
1322 if (geodiff)
1323 {
1324 /* Difference in values exists */
1325 if (!helioexist || !heliodiff)
1326 {
1327 /* Difference in one or more GEO values, and no HELIO input file */
1328 VCacheCache(outVCache,
1329 tbuf,
1330 &xValHelSav,
1331 &yValHelSav,
1332 &zValHelSav,
1333 &vxValHelSav,
1334 &vyValHelSav,
1335 &vzValHelSav,
1336 id,
1337 &xValGeo,
1338 &yValGeo,
1339 &zValGeo,
1340 &vxValGeo,
1341 &vyValGeo,
1342 &vzValGeo,
1343 idGEO);
1344 }
1345 else
1346 {
1347 /* There is a helio input file, and one or more values in that file
1348 * differ from the values in outSeries. */
1349 VCacheCache(outVCache,
1350 tbuf,
1351 &xValHel,
1352 &yValHel,
1353 &zValHel,
1354 &vxValHel,
1355 &vyValHel,
1356 &vzValHel,
1357 idHELIOtmp,
1358 &xValGeo,
1359 &yValGeo,
1360 &zValGeo,
1361 &vxValGeo,
1362 &vyValGeo,
1363 &vzValGeo,
1364 idGEO);
1365 }
1366 }
1367 else
1368 {
1369 /* The geo file's value are identical to the ones in outSeries -
1370 * save a record if the helio ones differ. */
1371 if (helioexist && heliodiff)
1372 {
1373 VCacheCache(outVCache,
1374 tbuf,
1375 &xValHel,
1376 &yValHel,
1377 &zValHel,
1378 &vxValHel,
1379 &vyValHel,
1380 &vzValHel,
1381 idHELIOtmp,
1382 &xValGeoSav,
1383 &yValGeoSav,
1384 &zValGeoSav,
1385 &vxValGeoSav,
1386 &vyValGeoSav,
1387 &vzValGeoSav,
1388 idg);
1389 }
1390 }
1391
1392 if (id)
1393 {
1394 free(id);
1395 }
1396
1397 if (idg)
1398 {
1399 free(idg);
1400 }
1401 }
1402 else
1403 {
1404 /* No previously existing record in outSeries */
1405 if (helioexist)
1406 {
1407 VCacheCache(outVCache,
1408 tbuf,
1409 &xValHel,
1410 &yValHel,
1411 &zValHel,
1412 &vxValHel,
1413 &vyValHel,
1414 &vzValHel,
1415 idHELIOtmp,
1416 &xValGeo,
1417 &yValGeo,
1418 &zValGeo,
1419 &vxValGeo,
1420 &vyValGeo,
1421 &vzValGeo,
1422 idGEO);
1423 }
1424 else
1425 {
1426 VCacheCache(outVCache,
1427 tbuf,
1428 NULL,
1429 NULL,
1430 NULL,
1431 NULL,
1432 NULL,
1433 NULL,
1434 NULL,
1435 &xValGeo,
1436 &yValGeo,
1437 &zValGeo,
1438 &vxValGeo,
1439 &vyValGeo,
1440 &vzValGeo,
1441 idGEO);
1442 }
1443 }
1444
1445 addedRecsGEO++;
1446
1447 if (helioexist)
1448 {
1449 addedRecsHELIO++;
1450 }
1451
1452 #if DEBUG
1453 throttle--;
1454 if (throttle == 0)
1455 {
1456 break;
1457 }
1458 #endif
1459 } /* while */
1460
1461 #if DEBUG
1462 fprintf(stdout, "Geo ingest time: %f seconds.\n", GetElapsedTime(timer));
1463 DestroyTimer(&timer);
1464 #endif
1465
1466 fclose(datafp);
1467 datafp = NULL;
1468 } /* end geo-file processing */
1469
1470 CloseCachedRecords(&existRCache);
1471
1472 /* Now, write all cached vectors into the output series. */
1473 HIterator_t *cachehit = hiter_create((HContainer_t *)outVCache);
1474 DRMS_RecordSet_t *rsout = drms_create_records(drmsEnv,
1475 outVCache->num_total,
1476 outSeries,
1477 DRMS_PERMANENT,
1478 &stat);
1479 DRMS_Record_t *recout = NULL;
1480 int irec = 0;
1481 VectorNode_t *node = NULL;
1482
1483 if (rsout && rsout->n > 0)
1484 {
1485 while ((node = (VectorNode_t *)hiter_getnext(cachehit)) != NULL)
1486 {
1487 recout = rsout->records[irec];
1488
1489 drms_setkey_string(recout, kSVKeyPrimary, node->tobs);
1490 drms_setkey_double(recout, kSVKeyXHELIO, node->hcix);
1491 drms_setkey_double(recout, kSVKeyYHELIO, node->hciy);
1492 drms_setkey_double(recout, kSVKeyZHELIO, node->hciz);
1493 drms_setkey_double(recout, kSVKeyVxHELIO, node->hcivx);
1494 drms_setkey_double(recout, kSVKeyVyHELIO, node->hcivy);
1495 drms_setkey_double(recout, kSVKeyVzHELIO, node->hcivz);
1496 drms_setkey_string(recout, kSVKey_idHELIO, node->hciID);
1497
1498 drms_setkey_double(recout, kSVKeyXGEO, node->gcix);
1499 drms_setkey_double(recout, kSVKeyYGEO, node->gciy);
1500 drms_setkey_double(recout, kSVKeyZGEO, node->gciz);
1501 drms_setkey_double(recout, kSVKeyVxGEO, node->gcivx);
1502 drms_setkey_double(recout, kSVKeyVyGEO, node->gcivy);
1503 drms_setkey_double(recout, kSVKeyVzGEO, node->gcivz);
1504 drms_setkey_string(recout, kSVKey_idGEO, node->gciID);
1505
1506 /* Date of record creation. */
1507 drms_keyword_setdate(recout);
1508
1509 irec++;
1510 }
1511
1512 error = (drms_close_records(rsout, DRMS_INSERT_RECORD) != DRMS_SUCCESS);
1513 }
1514 else
1515 {
1516 fprintf(stderr, "Failed to create records in series '%s'.\n", outSeries);
1517 error = 1;
1518 }
1519
1520 DestroyVectorCache(&outVCache);
1521 DestroyVectorCache(&helioOutVCache);
1522
1523 if (!error)
1524 {
1525 if (filePathHELIO && filePathGEO)
1526 {
1527 fprintf(stdout,
1528 "Ingested '%s' (%d recs) and '%s' (%d recs).\n",
1529 filePathHELIO,
1530 addedRecsHELIO,
1531 filePathGEO,
1532 addedRecsGEO);
1533 }
1534 else if (filePathHELIO)
1535 {
1536 fprintf(stdout, "Ingested '%s' (%d recs).\n", filePathHELIO, addedRecsHELIO);
1537 }
1538 else
1539 {
1540 fprintf(stdout, "Ingested '%s' (%d recs).\n", filePathGEO, addedRecsGEO);
1541 }
1542 }
1543
1544 return error;
1545 }
1546
1547 int DoIt(void)
1548 {
1549 int error = 0;
1550 int stat = DRMS_SUCCESS;
1551
1552 if (drms_env == NULL)
1553 {
1554 error = 1;
1555 }
1556 else
1557 {
1558 char seriesin[DRMS_MAXSERIESNAMELEN];
1559 char seriesout[DRMS_MAXSERIESNAMELEN];
1560 char serieshist[DRMS_MAXSERIESNAMELEN];
1561 char *fdsTRange = NULL;
1562
1563 const char *ns = cmdparams_get_str(&cmdparams, "ns", NULL);
1564 const char *si = cmdparams_get_str(&cmdparams, "seriesIn", NULL);
1565 const char *so = cmdparams_get_str(&cmdparams, "seriesOut", NULL);
1566 const char *timeRange = cmdparams_get_str(&cmdparams, "timeRange", NULL);
1567 const char *owner = cmdparams_get_str(&cmdparams, "owner", NULL);
1568
1569 if (strcmp(owner, kNotSpecified) == 0)
1570 {
1571 owner = NULL;
1572 }
1573
1574 snprintf(seriesin, sizeof(seriesin), "%s.%s", ns, si);
1575 snprintf(seriesout, sizeof(seriesout), "%s.%s", ns, so);
1576 snprintf(serieshist, sizeof(serieshist), "%s.%s", ns, kSeriesHistory);
1577
1578 if (cmdparams_isflagset(&cmdparams, "c"))
1579 {
1580 CreateOutSeries(drms_env, seriesout, owner, &stat);
1581 return 0;
1582 }
1583
1584 /* Create the record set query */
1585 if (error == 0)
1586 {
1587 if (strcmp(timeRange, "NOT SPECIFIED") != 0)
1588 {
1589 size_t len = strlen(kObsDateKey) + strlen(timeRange) + 3;
1590 fdsTRange = malloc(sizeof(char) * len + 1);
1591 if (fdsTRange != NULL)
1592 {
1593 snprintf(fdsTRange, len + 1, "[%s=%s]", kObsDateKey, timeRange);
1594 }
1595 else
1596 {
1597 error = 1;
1598 }
1599 }
1600 }
1601
1602 if (error == 0)
1603 {
1604 /* Query database to locate the record(s) for a given time range */
1605 DRMS_RecordSet_t *rsInput = NULL;
1606 DRMS_RecordSet_t *rsHistory = NULL;
1607 DRMS_RecordSet_t *rsHELIO = NULL;
1608 DRMS_RecordSet_t *rsGEO = NULL;
1609 char queryInput[DRMS_MAXQUERYLEN];
1610 char queryHistory[DRMS_MAXQUERYLEN];
1611 char *filePathHELIO = NULL;
1612 char *filePathGEO = NULL;
1613 char queryCORE[DRMS_MAXQUERYLEN];
1614 char queryHELIO[DRMS_MAXQUERYLEN];
1615 char queryGEO[DRMS_MAXQUERYLEN];
1616 char hashbuf[kMaxHashKey];
1617 char path[DRMS_MAXPATHLEN];
1618
1619 char *obs = NULL;
1620 char *prodcomp = NULL;
1621 char *format = NULL;
1622 int filevers;
1623
1624 int history = 0;
1625
1626 HContainer_t *proclist = NULL;
1627
1628 /* Need to get the date of the last helio record ingested */
1629 if (drms_series_exists(drms_env, serieshist, &stat))
1630 {
1631 /* kSeriesHistory keeps track of all orbit data already ingested into
1632 * seriesout. So, open all records in seriesin, iterate through each
1633 * record and skip any records that are already in kSeriesHistory. */
1634
1635 /* Need this history series because not all files in seriesin will be
1636 * ingested. If a file is an updated version of a data file,
1637 * but the data don't change, then we add a record in serieshist, but
1638 * not in seriesout. */
1639
1640 history = 1;
1641 }
1642
1643 if (fdsTRange)
1644 {
1645 snprintf(queryInput,
1646 sizeof(queryInput),
1647 "%s[%s=%s]%s,%s[%s=%s]%s",
1648 seriesin,
1649 kFdsProductCompKey,
1650 kFdsProductCompHELIO,
1651 fdsTRange,
1652 seriesin,
1653 kFdsProductCompKey,
1654 kFdsProductCompGEO,
1655 fdsTRange);
1656 }
1657 else
1658 {
1659 snprintf(queryInput,
1660 sizeof(queryInput),
1661 "%s[%s=%s],%s[%s=%s]",
1662 seriesin,
1663 kFdsProductCompKey,
1664 kFdsProductCompHELIO,
1665 seriesin,
1666 kFdsProductCompKey,
1667 kFdsProductCompGEO);
1668 }
1669
1670 /* If no files to ingest, this will remain empty */
1671 proclist = hcon_create(kMaxHashKey,
1672 kMaxHashKey,
1673 NULL,
1674 NULL,
1675 NULL,
1676 NULL,
1677 0);
1678
1679 rsInput = drms_open_records(drms_env, queryInput, &stat);
1680 if (rsInput && rsInput->n > 0)
1681 {
1682 DRMS_Record_t *recin = NULL;
1683 int irec;
1684
1685 for (irec = 0; irec < rsInput->n && !error; irec++)
1686 {
1687 recin = rsInput->records[irec];
1688
1689 if (recin)
1690 {
1691 obs = drms_getkey_string(recin, kObsDateKey, &stat);
1692 prodcomp = drms_getkey_string(recin, kFdsProductCompKey, &stat);
1693 format = drms_getkey_string(recin, kFdsDataFormatKey, &stat);
1694 filevers = drms_getkey_int(recin, kFileVersionKey, &stat);
1695
1696 /* Skip if this record has already been ingested */
1697 if (history)
1698 {
1699 snprintf(queryHistory,
1700 sizeof(queryHistory),
1701 "%s[%s=%s][%s=%s][%s=%s][%s=%d]",
1702 serieshist,
1703 kObsDateKey,
1704 obs,
1705 kFdsProductCompKey,
1706 prodcomp,
1707 kFdsDataFormatKey,
1708 format,
1709 kFileVersionKey,
1710 filevers);
1711
1712 rsHistory = drms_open_records(drms_env, queryHistory, &stat);
1713 if (rsHistory && rsHistory->n > 0)
1714 {
1715 /* skip - already ingested */
1716 drms_close_records(rsHistory, DRMS_FREE_RECORD);
1717 continue;
1718 }
1719 else
1720 {
1721 drms_close_records(rsHistory, DRMS_FREE_RECORD);
1722 }
1723 }
1724
1725 /* Skip if this record has been processed in this session */
1726 snprintf(hashbuf,
1727 sizeof(hashbuf),
1728 "%s[%s=%s][%s=%s][%s=%d]",
1729 seriesin,
1730 kObsDateKey,
1731 obs,
1732 kFdsDataFormatKey,
1733 format,
1734 kFileVersionKey,
1735 filevers);
1736
1737 if (hcon_member(proclist, hashbuf))
1738 {
1739 continue;
1740 }
1741
1742 /* record is okay to process (and possible ingest) - but first collect
1743 * HELIO and GEO in pairs */
1744 snprintf(queryCORE,
1745 sizeof(queryCORE),
1746 "%s[%s=%s][%s=%s][%s=%d]",
1747 seriesin,
1748 kObsDateKey,
1749 obs,
1750 kFdsDataFormatKey,
1751 format,
1752 kFileVersionKey,
1753 filevers);
1754
1755 hcon_insert(proclist, queryCORE, queryCORE);
1756 }
1757
1758 if (obs)
1759 {
1760 free(obs);
1761 }
1762
1763 if (prodcomp)
1764 {
1765 free(prodcomp);
1766 }
1767
1768 if (format)
1769 {
1770 free(format);
1771 }
1772
1773 } /* record loop */
1774
1775 drms_close_records(rsInput, DRMS_FREE_RECORD);
1776 }
1777
1778 /* Now, open all HELIO and GEO files that match the core queries saved
1779 * in proclist */
1780 HIterator_t *hit = hiter_create(proclist);
1781 char *iquery = NULL;
1782
1783 while ((iquery = (char *)hiter_getnext(hit)) != NULL && !error)
1784 {
1785 #if 0
1786 /* If iquery starts with "###", then either the helio or geo (or both)
1787 * record does not need to be ingested - the data have already been
1788 * ingested from a file of an earlier version (but the data have not changed). */
1789 if (strncmp(iquery, "###", 3) == 0)
1790 {
1791 /* If there is another identical query, except that it does NOT start
1792 * with "###", then this means either the helio or geo data DID change
1793 * so we have to reingest the data (and it may have been ingested in
1794 * this session already). */
1795 if (hcon_member(proclist, iquery[3]))
1796 {
1797 /* The other matching record had data that did change. When iquery
1798 * points to that record, data for both helio and geo will be ingested.
1799 * Ignore this query. */
1800 continue;
1801 }
1802 else
1803 {
1804 /* There was a file with an updated version that had data that does not
1805 * differ from data that has already been ingested. But we have to
1806 * mark in the history that these data have been processed. */
1807 oktoadd = 0;
1808 iquery = iquery[3];
1809 }
1810 }
1811 #endif
1812 snprintf(queryHELIO,
1813 sizeof(queryHELIO),
1814 "%s[%s=%s]",
1815 iquery,
1816 kFdsProductCompKey,
1817 kFdsProductCompHELIO);
1818
1819 snprintf(queryGEO,
1820 sizeof(queryGEO),
1821 "%s[%s=%s]",
1822 iquery,
1823 kFdsProductCompKey,
1824 kFdsProductCompGEO);
1825
1826 DRMS_Record_t *recsv = NULL;
1827 TIME obsvalHELIO = 0;
1828 char *dprodvalHELIO = NULL;
1829 char *prodcompvalHELIO = NULL;
1830 char *formatvalHELIO = NULL;
1831 int fileversvalHELIO = 0;
1832 TIME obsvalGEO = 0;
1833 char *dprodvalGEO = NULL;
1834 char *prodcompvalGEO = NULL;
1835 char *formatvalGEO = NULL;
1836 int fileversvalGEO = 0;
1837
1838 rsHELIO = drms_open_records(drms_env, queryHELIO, &stat);
1839
1840 if (rsHELIO && stat != DRMS_ERROR_UNKNOWNSERIES)
1841 {
1842 if (rsHELIO->n != 1)
1843 {
1844 fprintf(stderr, "Expected only one record with query '%s'\n", queryHELIO);
1845 error = 1;
1846 }
1847 else
1848 {
1849 /* Get the HELIO filepath */
1850 DRMS_Record_t *recHELIO = rsHELIO->records[0];
1851
1852 DRMS_Segment_t *helioseg = drms_segment_lookup(recHELIO, kSVFileSegName);
1853
1854 if (helioseg != NULL)
1855 {
1856 drms_record_directory(recHELIO, path, 1);
1857
1858 if (*path)
1859 {
1860 size_t len = strlen(path) + strlen(helioseg->filename) + 1;
1861 filePathHELIO = malloc(sizeof(char) * len + 1);
1862 if (filePathHELIO)
1863 {
1864 snprintf(filePathHELIO, len + 1, "%s/%s", path, helioseg->filename);
1865 }
1866 }
1867 }
1868
1869 if (filePathHELIO)
1870 {
1871 recsv = rsHELIO->records[0];
1872 obsvalHELIO = drms_getkey_time(recsv, kObsDateKey, &stat);
1873 dprodvalHELIO = drms_getkey_string(recsv, kFdsDataProductKey, &stat);
1874 prodcompvalHELIO = drms_getkey_string(recsv, kFdsProductCompKey, &stat);
1875 formatvalHELIO = drms_getkey_string(recsv, kFdsDataFormatKey, &stat);
1876 fileversvalHELIO = drms_getkey_int(recsv, kFileVersionKey, &stat);
1877 }
1878 }
1879 }
1880
1881 if (!error)
1882 {
1883 rsGEO = drms_open_records(drms_env, queryGEO, &stat);
1884 }
1885
1886 if (rsGEO && stat != DRMS_ERROR_UNKNOWNSERIES)
1887 {
1888 if (rsGEO->n != 1)
1889 {
1890 fprintf(stderr, "Expected only one record with query '%s'\n", queryGEO);
1891 error = 1;
1892 }
1893 else
1894 {
1895 /* Get the GEO filepath */
1896 DRMS_Record_t *recGEO = rsGEO->records[0];
1897
1898 DRMS_Segment_t *geoseg = drms_segment_lookup(recGEO, kSVFileSegName);
1899
1900 if (geoseg != NULL)
1901 {
1902 drms_record_directory(recGEO, path, 1);
1903
1904 if (*path)
1905 {
1906 size_t len = strlen(path) + strlen(geoseg->filename) + 1;
1907 filePathGEO = malloc(sizeof(char) * len + 1);
1908 if (filePathGEO)
1909 {
1910 snprintf(filePathGEO, len + 1, "%s/%s", path, geoseg->filename);
1911 }
1912 }
1913 }
1914
1915 if (filePathGEO)
1916 {
1917 recsv = rsGEO->records[0];
1918 obsvalGEO = drms_getkey_time(recsv, kObsDateKey, &stat);
1919 dprodvalGEO = drms_getkey_string(recsv, kFdsDataProductKey, &stat);
1920 prodcompvalGEO = drms_getkey_string(recsv, kFdsProductCompKey, &stat);
1921 formatvalGEO = drms_getkey_string(recsv, kFdsDataFormatKey, &stat);
1922 fileversvalGEO = drms_getkey_int(recsv, kFileVersionKey, &stat);
1923 }
1924 }
1925 }
1926
1927 if (!error)
1928 {
1929 /* Create HELIO and GEO ids */
1930 char idHELIO[256];
1931 char idGEO[256];
1932
1933 if (rsHELIO && filePathHELIO)
1934 {
1935 char *datestr = drms_getkey_string(rsHELIO->records[0],
1936 kObsDateKey,
1937 &stat);
1938
1939 /* Date must come first - the indexing code forces this */
1940 snprintf(idHELIO,
1941 sizeof(idHELIO),
1942 "%s[%s][%s][%s][%d]",
1943 rsHELIO->records[0]->seriesinfo->seriesname,
1944 datestr,
1945 prodcompvalHELIO,
1946 formatvalHELIO,
1947 fileversvalHELIO);
1948
1949 if (datestr)
1950 {
1951 free(datestr);
1952 }
1953 }
1954
1955 if (rsGEO && filePathGEO)
1956 {
1957 char *datestr = drms_getkey_string(rsGEO->records[0],
1958 kObsDateKey,
1959 &stat);
1960
1961 /* Date must come first - the indexing code forces this */
1962 snprintf(idGEO,
1963 sizeof(idGEO),
1964 "%s[%s][%s][%s][%d]",
1965 rsGEO->records[0]->seriesinfo->seriesname,
1966 datestr,
1967 prodcompvalGEO,
1968 formatvalGEO,
1969 fileversvalGEO);
1970
1971 if (datestr)
1972 {
1973 free(datestr);
1974 }
1975 }
1976
1977 if (filePathHELIO || filePathGEO)
1978 {
1979 error = ExtractStateVectors(drms_env,
1980 filePathHELIO,
1981 idHELIO,
1982 filePathGEO,
1983 idGEO,
1984 seriesout,
1985 owner);
1986 }
1987 }
1988
1989 if (!error)
1990 {
1991 /* Add a record to the history series so that this record doesn't get
1992 * reingested - need to create 2 records, one for helio and one for
1993 * geo */
1994 if (!history)
1995 {
1996 /* Create history series. */
1997 CreateHistSeries(drms_env, serieshist, owner, &stat);
1998 if (!stat)
1999 {
2000 history = 1;
2001 }
2002 }
2003
2004 int nhist = 0;
2005 int ihist = 0;
2006
2007 if (filePathHELIO)
2008 {
2009 nhist++;
2010 }
2011
2012 if (filePathGEO)
2013 {
2014 nhist++;
2015 }
2016
2017 if (nhist > 0)
2018 {
2019 DRMS_RecordSet_t *rshist = drms_create_records(drms_env,
2020 nhist,
2021 serieshist,
2022 DRMS_PERMANENT,
2023 &stat);
2024 DRMS_Record_t *rechist = NULL;
2025
2026 if (rshist)
2027 {
2028 if (filePathHELIO)
2029 {
2030 rechist = rshist->records[ihist++];
2031 drms_setkey_time(rechist, kObsDateKey, obsvalHELIO);
2032 drms_setkey_string(rechist, kFdsDataProductKey, dprodvalHELIO);
2033 drms_setkey_string(rechist, kFdsProductCompKey, prodcompvalHELIO);
2034 drms_setkey_string(rechist, kFdsDataFormatKey, formatvalHELIO);
2035 drms_setkey_int(rechist, kFileVersionKey, fileversvalHELIO);
2036 }
2037
2038 if (filePathGEO)
2039 {
2040 rechist = rshist->records[ihist++];
2041 drms_setkey_time(rechist, kObsDateKey, obsvalGEO);
2042 drms_setkey_string(rechist, kFdsDataProductKey, dprodvalGEO);
2043 drms_setkey_string(rechist, kFdsProductCompKey, prodcompvalGEO);
2044 drms_setkey_string(rechist, kFdsDataFormatKey, formatvalGEO);
2045 drms_setkey_int(rechist, kFileVersionKey, fileversvalGEO);
2046 }
2047
2048 drms_close_records(rshist, DRMS_INSERT_RECORD);
2049 }
2050 }
2051 }
2052
2053 if (rsHELIO)
2054 {
2055 drms_close_records(rsHELIO, DRMS_FREE_RECORD);
2056 rsHELIO = NULL;
2057 }
2058
2059 if (rsGEO)
2060 {
2061 drms_close_records(rsGEO, DRMS_FREE_RECORD);
2062 rsGEO = NULL;
2063 }
2064
2065 if (dprodvalHELIO)
2066 {
2067 free(dprodvalHELIO);
2068 }
2069
2070 if (prodcompvalHELIO)
2071 {
2072 free(prodcompvalHELIO);
2073 }
2074
2075 if (formatvalHELIO)
2076 {
2077 free(formatvalHELIO);
2078 }
2079
2080 if (dprodvalGEO)
2081 {
2082 free(dprodvalGEO);
2083 }
2084
2085 if (prodcompvalGEO)
2086 {
2087 free(prodcompvalGEO);
2088 }
2089
2090 if (formatvalGEO)
2091 {
2092 free(formatvalGEO);
2093 }
2094
2095 } /* iquery - core queries */
2096
2097 hiter_destroy(&hit);
2098 hcon_destroy(&proclist);
2099 }
2100
2101 if (fdsTRange)
2102 {
2103 free(fdsTRange);
2104 }
2105 }
2106
2107 return error;
2108 }