/* * $Source$ * $Author$ * $Header$ * * Copyright (C) 1989 by the Massachusetts Institute of Technology * For copying and distribution information, please see the file * . * */ #ifndef lint static char *rcsid_increment_dc = "$Header$"; #endif lint #include #include #include #include #include #include "mr_server.h" #include "query.h" #include "qrtn.h" EXEC SQL INCLUDE sqlca; extern char *whoami; extern char *table_name[]; extern int num_tables; int inc_pid = 0; int inc_running = 0; time_t inc_started; #define MAXARGC 15 EXEC SQL WHENEVER SQLERROR DO dbmserr(); /* structures to save before args */ static char *before[MAXARGC]; static int beforec; static enum tables beforetable; /* structures to save after args */ static char *after[MAXARGC]; static int afterc; /* structures to save entire sets of incremental changes */ struct save_queue *incremental_sq = NULL; struct save_queue *incremental_exec = NULL; struct iupdate { char *table; int beforec; char **before; int afterc; char **after; char *service; }; void next_incremental(void); char **copy_argv(char **argv, int argc); void free_argv(char **argv, int argc); int table_num(char *table); void incremental_init(void) { int i; if (incremental_sq == NULL) incremental_sq = sq_create(); if (incremental_exec == NULL) incremental_exec = sq_create(); for(i=0; itable = table_name[table]; iu->beforec = beforec; iu->before = copy_argv(before, beforec); iu->afterc = afterc; iu->after = copy_argv(after, afterc); sq_save_data(incremental_sq, iu); #ifdef DEBUG sprintf(buffer, "INCREMENTAL(%s, [", table_name[table]); for (i = 0; i < beforec; i++) { if (i == 0) strcat(buffer, strtrim(before[0])); else { strcat(buffer, ", "); strcat(buffer, strtrim(before[i])); } } strcat(buffer, "], ["); for (i = 0; i < afterc; i++) { if (i == 0) strcat(buffer, strtrim(after[0])); else { strcat(buffer, ", "); strcat(buffer, strtrim(after[i])); } } strcat(buffer, "])"); com_err(whoami, 0, buffer); #endif DEBUG } void incremental_clear_after(void) { incremental_after(NO_TABLE, NULL, NULL); } /* Called when the current transaction is committed to start any queued * incremental updates. This caches the update table the first time it * is called. */ struct inc_cache { struct inc_cache *next; char *table, *service; }; void incremental_update(void) { static int inited = 0; static struct inc_cache *cache; struct inc_cache *c; EXEC SQL BEGIN DECLARE SECTION; char tab[17], serv[17]; EXEC SQL END DECLARE SECTION; struct iupdate *iu; if (!inited) { inited++; EXEC SQL DECLARE inc CURSOR FOR SELECT table_name, service FROM incremental; EXEC SQL OPEN inc; while (1) { EXEC SQL FETCH inc INTO :tab, :serv; if (sqlca.sqlcode != 0) break; c = (struct inc_cache *)malloc(sizeof(struct inc_cache)); c->next = cache; c->table = strsave(strtrim(tab)); c->service = strsave(strtrim(serv)); cache = c; } EXEC SQL CLOSE inc; EXEC SQL COMMIT WORK; } while (sq_remove_data(incremental_sq, &iu)) { for (c = cache; c; c = c->next) { if (!strcmp(c->table, iu->table)) { iu->service = c->service; sq_save_data(incremental_exec, iu); } } } if (inc_running == 0) next_incremental(); } void next_incremental(void) { struct iupdate *iu; char *argv[MAXARGC * 2 + 4], cafter[3], cbefore[3], prog[BUFSIZ]; int i; sigset_t sigs; if (incremental_exec == NULL) incremental_init(); if (sq_empty(incremental_exec) || (inc_running && now - inc_started < INC_TIMEOUT)) return; if (inc_running) com_err(whoami, 0, "incremental timeout on pid %d", inc_pid); sq_remove_data(incremental_exec, &iu); argv[1] = iu->table; sprintf(cbefore, "%d", iu->beforec); argv[2] = cbefore; sprintf(cafter, "%d", iu->afterc); argv[3] = cafter; for (i = 0; i < iu->beforec; i++) argv[4 + i] = iu->before[i]; for (i = 0; i < iu->afterc; i++) argv[4 + iu->beforec + i] = iu->after[i]; sprintf(prog, "%s/%s.incr", BIN_DIR, iu->service); #ifdef DEBUG com_err(whoami, 0, "forking %s", prog); #endif argv[0] = prog; argv[4 + iu->beforec + iu->afterc] = 0; sigemptyset(&sigs); sigaddset(&sigs, SIGCHLD); sigprocmask(SIG_BLOCK, &sigs, NULL); inc_pid = vfork(); switch (inc_pid) { case 0: execv(prog, argv); _exit(1); case -1: com_err(whoami, 0, "Failed to start incremental update"); break; default: inc_running = 1; inc_started = now; } sigprocmask(SIG_UNBLOCK, &sigs, NULL); free_argv(iu->before, iu->beforec); free_argv(iu->after, iu->afterc); free(iu); } /* Called when the current transaction is aborted to throw away any queued * incremental updates */ void incremental_flush(void) { struct iupdate *iu; while (sq_get_data(incremental_sq, &iu)) { free_argv(iu->before, iu->beforec); free_argv(iu->after, iu->afterc); free(iu); } sq_destroy(incremental_sq); incremental_sq = sq_create(); } char **copy_argv(argv, argc) char **argv; int argc; { char **ret = (char **)malloc(sizeof(char *) * argc); while (--argc >= 0) ret[argc] = strsave(strtrim(argv[argc])); return(ret); } void free_argv(argv, argc) char **argv; int argc; { while (--argc >= 0) free(argv[argc]); free(argv); } int table_num(char *name) { int i; for(i = num_tables-1; i; i--) if(!strcmp(table_name[i], name)) break; return i; /* 0 = "none" if no match */ }