/* $OpenBSD: rf_engine.c,v 1.1 1999/01/11 14:29:19 niklas Exp $ */ /* $NetBSD: rf_engine.c,v 1.2 1998/11/13 11:48:26 simonb Exp $ */ /* * Copyright (c) 1995 Carnegie-Mellon University. * All rights reserved. * * Author: William V. Courtright II, Mark Holland, Rachad Youssef * * Permission to use, copy, modify and distribute this software and * its documentation is hereby granted, provided that both the copyright * notice and this permission notice appear in all copies of the * software, derivative works or modified versions, and any portions * thereof, and that both notices appear in supporting documentation. * * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. * * Carnegie Mellon requests users of this software to return to * * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU * School of Computer Science * Carnegie Mellon University * Pittsburgh PA 15213-3890 * * any improvements or extensions that they make and grant Carnegie the * rights to redistribute these changes. */ /**************************************************************************** * * * engine.c -- code for DAG execution engine * * * * Modified to work as follows (holland): * * A user-thread calls into DispatchDAG, which fires off the nodes that * * are direct successors to the header node. DispatchDAG then returns, * * and the rest of the I/O continues asynchronously. As each node * * completes, the node execution function calls FinishNode(). FinishNode * * scans the list of successors to the node and increments the antecedent * * counts. Each node that becomes enabled is placed on a central node * * queue. A dedicated dag-execution thread grabs nodes off of this * * queue and fires them. * * * * NULL nodes are never fired. * * * * Terminator nodes are never fired, but rather cause the callback * * associated with the DAG to be invoked. * * * * If a node fails, the dag either rolls forward to the completion or * * rolls back, undoing previously-completed nodes and fails atomically. * * The direction of recovery is determined by the location of the failed * * node in the graph. If the failure occured before the commit node in * * the graph, backward recovery is used. Otherwise, forward recovery is * * used. * * * ****************************************************************************/ /* * : * * Log: rf_engine.c,v * Revision 1.56 1996/07/28 20:31:39 jimz * i386netbsd port * true/false fixup * * Revision 1.55 1996/07/22 19:52:16 jimz * switched node params to RF_DagParam_t, a union of * a 64-bit int and a void *, for better portability * attempted hpux port, but failed partway through for * lack of a single C compiler capable of compiling all * source files * * Revision 1.54 1996/07/17 21:00:58 jimz * clean up timer interface, tracing * * Revision 1.53 1996/07/15 17:22:18 jimz * nit-pick code cleanup * resolve stdlib problems on DEC OSF * * Revision 1.52 1996/06/17 03:17:08 jimz * correctly shut down engine thread in kernel * * Revision 1.51 1996/06/14 15:02:10 jimz * make new engine code happy in simulator * * Revision 1.50 1996/06/14 14:19:48 jimz * use diskgroup to control engine thread, make all engine-thread-related * stuff per-array * * Revision 1.49 1996/06/10 11:55:47 jimz * Straightened out some per-array/not-per-array distinctions, fixed * a couple bugs related to confusion. Added shutdown lists. Removed * layout shutdown function (now subsumed by shutdown lists). * * Revision 1.48 1996/06/09 02:36:46 jimz * lots of little crufty cleanup- fixup whitespace * issues, comment #ifdefs, improve typing in some * places (esp size-related) * * Revision 1.47 1996/06/06 01:23:23 jimz * fix bug in node traversal when firing multiple nodes simultaneously * * Revision 1.46 1996/06/05 18:06:02 jimz * Major code cleanup. The Great Renaming is now done. * Better modularity. Better typing. Fixed a bunch of * synchronization bugs. Made a lot of global stuff * per-desc or per-array. Removed dead code. * * Revision 1.45 1996/05/30 12:59:18 jimz * make etimer happier, more portable * * Revision 1.44 1996/05/30 11:29:41 jimz * Numerous bug fixes. Stripe lock release code disagreed with the taking code * about when stripes should be locked (I made it consistent: no parity, no lock) * There was a lot of extra serialization of I/Os which I've removed- a lot of * it was to calculate values for the cache code, which is no longer with us. * More types, function, macro cleanup. Added code to properly quiesce the array * on shutdown. Made a lot of stuff array-specific which was (bogusly) general * before. Fixed memory allocation, freeing bugs. * * Revision 1.43 1996/05/27 18:56:37 jimz * more code cleanup * better typing * compiles in all 3 environments * * Revision 1.42 1996/05/24 22:17:04 jimz * continue code + namespace cleanup * typed a bunch of flags * * Revision 1.41 1996/05/24 04:28:55 jimz * release cleanup ckpt * * Revision 1.40 1996/05/23 00:33:23 jimz * code cleanup: move all debug decls to rf_options.c, all extern * debug decls to rf_options.h, all debug vars preceded by rf_ * * Revision 1.39 1996/05/20 16:15:17 jimz * switch to rf_{mutex,cond}_{init,destroy} * * Revision 1.38 1996/05/18 20:09:54 jimz * bit of cleanup to compile cleanly in kernel, once again * * Revision 1.37 1996/05/18 19:51:34 jimz * major code cleanup- fix syntax, make some types consistent, * add prototypes, clean out dead code, et cetera * * Revision 1.36 1996/05/15 20:24:19 wvcii * fixed syntax bug in SIMULATE clause above ProcessNode * * Revision 1.35 1996/05/08 21:01:24 jimz * fixed up enum type names that were conflicting with other * enums and function names (ie, "panic") * future naming trends will be towards RF_ and rf_ for * everything raidframe-related * * Revision 1.34 1996/05/08 15:25:28 wvcii * eliminated dead code * merged common cases (sim/user/kernel) * entire node lists (arrays) now fired atomically * reordered source code for readability * beefed-up & corrected comments * * Revision 1.33 1996/05/07 19:39:40 jimz * 1. fixed problems in PropogateResults() with nodes being referenced * after they were no longer valid * 2. fixed problems in PropogateResults() with the node list being * incorrectly threaded * * Revision 1.32 1996/05/07 19:03:56 wvcii * in PropagateResults, fixed a bug in the rollBackward case: * node data is copied before the call to FinishNode which * frees the node and destroys its data. * * Revision 1.31 1996/05/07 17:45:17 jimz * remove old #if 0 code from PropogateResults() (was kept in * previous version for archival purposes (rcsdiff)) * * Revision 1.30 1996/05/07 17:44:19 jimz * fix threading of nodes to be fired in PropagateResults() * fix iteration through skiplist in PropagateResults() * fix incorrect accesses to freed memory (dereferencing a * node that was freed by the action of calling FinishNode() * on it, which in turn completed its DAG) in PropagateResults() * * Revision 1.29 1996/05/02 15:04:15 wvcii * fixed bad array index in PropagateResults * * Revision 1.28 1995/12/12 18:10:06 jimz * MIN -> RF_MIN, MAX -> RF_MAX, ASSERT -> RF_ASSERT * fix 80-column brain damage in comments * * Revision 1.27 1995/12/08 15:07:03 arw * cache code cleanup * * Revision 1.26 1995/11/07 16:18:01 wvcii * numerous changes associated with roll-away error recovery * when a node fails, dag enters rollForward or rollBackward state * * Revision 1.25 1995/09/06 19:27:17 wvcii * added debug vars enableRollAway and debugRecovery * */ #ifdef _KERNEL #define KERNEL #endif #include "rf_threadstuff.h" #ifndef KERNEL #include #include #endif /* !KERNEL */ #include #include "rf_dag.h" #include "rf_engine.h" #include "rf_threadid.h" #include "rf_etimer.h" #include "rf_general.h" #include "rf_dagutils.h" #include "rf_shutdown.h" #include "rf_raid.h" #ifndef SIMULATE static void DAGExecutionThread(RF_ThreadArg_t arg); #endif /* !SIMULATE */ #define DO_INIT(_l_,_r_) { \ int _rc; \ _rc = rf_create_managed_mutex(_l_,&(_r_)->node_queue_mutex); \ if (_rc) { \ return(_rc); \ } \ _rc = rf_create_managed_cond(_l_,&(_r_)->node_queue_cond); \ if (_rc) { \ return(_rc); \ } \ } /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ #ifndef KERNEL #define DO_LOCK(_r_) RF_LOCK_MUTEX((_r_)->node_queue_mutex) #define DO_UNLOCK(_r_) RF_UNLOCK_MUTEX((_r_)->node_queue_mutex) #define DO_WAIT(_r_) RF_WAIT_COND((_r_)->node_queue_cond, (_r_)->node_queue_mutex) #define DO_SIGNAL(_r_) RF_SIGNAL_COND((_r_)->node_queue_cond) #else /* !KERNEL */ /* * XXX Is this spl-ing really necessary? */ #define DO_LOCK(_r_) { ks = splbio(); RF_LOCK_MUTEX((_r_)->node_queue_mutex); } #define DO_UNLOCK(_r_) { RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); splx(ks); } #if !defined(__NetBSD__) && !defined(__OpenBSD__) #define DO_WAIT(_r_) mpsleep(&(_r_)->node_queue, PZERO, "raidframe nq", 0, (void *) simple_lock_addr((_r_)->node_queue_mutex), MS_LOCK_SIMPLE) #else #define DO_WAIT(_r_) tsleep(&(_r_)->node_queue, PRIBIO | PCATCH, "raidframe nq",0) #endif #define DO_SIGNAL(_r_) wakeup(&(_r_)->node_queue) #endif /* !KERNEL */ static void rf_ShutdownEngine(void *); static void rf_ShutdownEngine(arg) void *arg; { RF_Raid_t *raidPtr; raidPtr = (RF_Raid_t *)arg; #ifndef SIMULATE raidPtr->shutdown_engine = 1; DO_SIGNAL(raidPtr); /* XXX something is missing here... */ #ifdef DEBUG printf("IGNORING WAIT_STOP\n"); #endif #if 0 RF_THREADGROUP_WAIT_STOP(&raidPtr->engine_tg); #endif #endif /* !SIMULATE */ } int rf_ConfigureEngine( RF_ShutdownList_t **listp, RF_Raid_t *raidPtr, RF_Config_t *cfgPtr) { int rc, tid=0; if (rf_engineDebug) { rf_get_threadid(tid); } DO_INIT(listp,raidPtr); raidPtr->node_queue = NULL; raidPtr->dags_in_flight = 0; #ifndef SIMULATE rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg); if (rc) return(rc); /* we create the execution thread only once per system boot. * no need to check return code b/c the kernel panics if it can't create the thread. */ if (rf_engineDebug) { printf("[%d] Creating engine thread\n", tid); } if (RF_CREATE_THREAD(raidPtr->engine_thread, DAGExecutionThread, raidPtr)) { RF_ERRORMSG("RAIDFRAME: Unable to create engine thread\n"); return(ENOMEM); } if (rf_engineDebug) { printf("[%d] Created engine thread\n", tid); } RF_THREADGROUP_STARTED(&raidPtr->engine_tg); /* XXX something is missing here... */ #ifdef debug printf("Skipping the WAIT_START!!\n"); #endif #if 0 RF_THREADGROUP_WAIT_START(&raidPtr->engine_tg); #endif /* engine thread is now running and waiting for work */ if (rf_engineDebug) { printf("[%d] Engine thread running and waiting for events\n", tid); } #endif /* !SIMULATE */ rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); if (rc) { RF_ERRORMSG3("Unable to add to shutdown list file %s line %d rc=%d\n", __FILE__, __LINE__, rc); rf_ShutdownEngine(NULL); } return(rc); } static int BranchDone(RF_DagNode_t *node) { int i; /* return true if forward execution is completed for a node and it's succedents */ switch (node->status) { case rf_wait : /* should never be called in this state */ RF_PANIC(); break; case rf_fired : /* node is currently executing, so we're not done */ return(RF_FALSE); case rf_good : for (i = 0; i < node->numSuccedents; i++) /* for each succedent */ if (!BranchDone(node->succedents[i])) /* recursively check branch */ return RF_FALSE; return RF_TRUE; /* node and all succedent branches aren't in fired state */ break; case rf_bad : /* succedents can't fire */ return(RF_TRUE); case rf_recover : /* should never be called in this state */ RF_PANIC(); break; case rf_undone : case rf_panic : /* XXX need to fix this case */ /* for now, assume that we're done */ return(RF_TRUE); break; default : /* illegal node status */ RF_PANIC(); break; } } #ifdef SIMULATE /* this is only ifdef SIMULATE because nothing else needs it */ /* recursively determine if a DAG has completed execution */ static int DAGDone(RF_DagHeader_t *dag) { int i; for (i = 0; i < dag->numSuccedents; i++) if (!BranchDone(dag->succedents[i])) return RF_FALSE; return RF_TRUE; } #endif /* SIMULATE */ static int NodeReady(RF_DagNode_t *node) { int ready; switch (node->dagHdr->status) { case rf_enable : case rf_rollForward : if ((node->status == rf_wait) && (node->numAntecedents == node->numAntDone)) ready = RF_TRUE; else ready = RF_FALSE; break; case rf_rollBackward : RF_ASSERT(node->numSuccDone <= node->numSuccedents); RF_ASSERT(node->numSuccFired <= node->numSuccedents); RF_ASSERT(node->numSuccFired <= node->numSuccDone); if ((node->status == rf_good) && (node->numSuccDone == node->numSuccedents)) ready = RF_TRUE; else ready = RF_FALSE; break; default : printf("Execution engine found illegal DAG status in NodeReady\n"); RF_PANIC(); break; } return(ready); } /* user context and dag-exec-thread context: * Fire a node. The node's status field determines which function, do or undo, * to be fired. * This routine assumes that the node's status field has alread been set to * "fired" or "recover" to indicate the direction of execution. */ static void FireNode(RF_DagNode_t *node) { int tid; switch (node->status) { case rf_fired : /* fire the do function of a node */ if (rf_engineDebug) { rf_get_threadid(tid); printf("[%d] Firing node 0x%lx (%s)\n",tid,(unsigned long) node, node->name); } #ifdef KERNEL if (node->flags & RF_DAGNODE_FLAG_YIELD) { #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL) /* thread_block(); */ /* printf("Need to block the thread here...\n"); */ /* XXX thread_block is actually mentioned in /usr/include/vm/vm_extern.h */ #else thread_block(); #endif } #endif /* KERNEL */ (*(node->doFunc)) (node); break; case rf_recover : /* fire the undo function of a node */ if (rf_engineDebug || 1) { rf_get_threadid(tid); printf("[%d] Firing (undo) node 0x%lx (%s)\n",tid,(unsigned long) node, node->name); } #ifdef KERNEL if (node->flags & RF_DAGNODE_FLAG_YIELD) #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL) /* thread_block(); */ /* printf("Need to block the thread here...\n"); */ /* XXX thread_block is actually mentioned in /usr/include/vm/vm_extern.h */ #else thread_block(); #endif #endif /* KERNEL */ (*(node->undoFunc)) (node); break; default : RF_PANIC(); break; } } /* user context: * Attempt to fire each node in a linear array. * The entire list is fired atomically. */ static void FireNodeArray( int numNodes, RF_DagNode_t **nodeList) { RF_DagStatus_t dstat; RF_DagNode_t *node; int i, j; /* first, mark all nodes which are ready to be fired */ for (i = 0; i < numNodes; i++) { node = nodeList[i]; dstat = node->dagHdr->status; RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); if (NodeReady(node)) { if ((dstat == rf_enable) || (dstat == rf_rollForward)) { RF_ASSERT(node->status == rf_wait); if (node->commitNode) node->dagHdr->numCommits++; node->status = rf_fired; for (j = 0; j < node->numAntecedents; j++) node->antecedents[j]->numSuccFired++; } else { RF_ASSERT(dstat == rf_rollBackward); RF_ASSERT(node->status == rf_good); RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node per graph */ node->status = rf_recover; } } } /* now, fire the nodes */ for (i = 0; i < numNodes; i++) { if ((nodeList[i]->status == rf_fired) || (nodeList[i]->status == rf_recover)) FireNode(nodeList[i]); } } #ifndef SIMULATE /* user context: * Attempt to fire each node in a linked list. * The entire list is fired atomically. */ static void FireNodeList(RF_DagNode_t *nodeList) { RF_DagNode_t *node, *next; RF_DagStatus_t dstat; int j; if (nodeList) { /* first, mark all nodes which are ready to be fired */ for (node = nodeList; node; node = next) { next = node->next; dstat = node->dagHdr->status; RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); if (NodeReady(node)) { if ((dstat == rf_enable) || (dstat == rf_rollForward)) { RF_ASSERT(node->status == rf_wait); if (node->commitNode) node->dagHdr->numCommits++; node->status = rf_fired; for (j = 0; j < node->numAntecedents; j++) node->antecedents[j]->numSuccFired++; } else { RF_ASSERT(dstat == rf_rollBackward); RF_ASSERT(node->status == rf_good); RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node per graph */ node->status = rf_recover; } } } /* now, fire the nodes */ for (node = nodeList; node; node = next) { next = node->next; if ((node->status == rf_fired) || (node->status == rf_recover)) FireNode(node); } } } #endif /* !SIMULATE */ /* interrupt context: * for each succedent * propagate required results from node to succedent * increment succedent's numAntDone * place newly-enable nodes on node queue for firing * * To save context switches, we don't place NIL nodes on the node queue, * but rather just process them as if they had fired. Note that NIL nodes * that are the direct successors of the header will actually get fired by * DispatchDAG, which is fine because no context switches are involved. * * Important: when running at user level, this can be called by any * disk thread, and so the increment and check of the antecedent count * must be locked. I used the node queue mutex and locked down the * entire function, but this is certainly overkill. */ static void PropagateResults( RF_DagNode_t *node, int context) { RF_DagNode_t *s, *a; RF_Raid_t *raidPtr; int tid, i, ks; #ifdef SIMULATE RF_PropHeader_t *p; /* prop list for succedent i */ #else /* SIMULATE */ RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be finished */ RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata antecedents */ RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ RF_DagNode_t *q = NULL, *qh = NULL, *next; int j, skipNode; #endif /* SIMULATE */ rf_get_threadid(tid); raidPtr = node->dagHdr->raidPtr; DO_LOCK(raidPtr); /* debug - validate fire counts */ for (i = 0; i < node->numAntecedents; i++) { a = *(node->antecedents + i); RF_ASSERT(a->numSuccFired >= a->numSuccDone); RF_ASSERT(a->numSuccFired <= a->numSuccedents); a->numSuccDone++; } switch (node->dagHdr->status) { case rf_enable : case rf_rollForward : #ifdef SIMULATE /* currently we never propagate results unless in simulation */ for (i = 0; i < node->numSuccedents; i++) { s = *(node->succedents + i); RF_ASSERT(s->status == rf_wait); (s->numAntDone)++; if (node->propList == NULL) /* null propList implies no results to be propagated */ p = NULL; else /* p=head of prop list for succedent i */ p = *(node->propList + i); while (p != NULL) { /* bind node results to succedent's parameters */ #if 0 *(s->params + p->paramNum) = *(node->results + p->resultNum); #else s->params[p->paramNum].p = node->results[p->resultNum]; #endif p = p->next; } } #else /* SIMULATE */ for (i = 0; i < node->numSuccedents; i++) { s = *(node->succedents + i); RF_ASSERT(s->status == rf_wait); (s->numAntDone)++; if (s->numAntDone == s->numAntecedents) { /* look for NIL nodes */ if (s->doFunc == rf_NullNodeFunc) { /* don't fire NIL nodes, just process them */ s->next = finishlist; finishlist = s; } else { /* look to see if the node is to be skipped */ skipNode = RF_FALSE; for (j = 0; j < s->numAntecedents; j++) if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) skipNode = RF_TRUE; if (skipNode) { /* this node has one or more failed true data dependencies, so skip it */ s->next = skiplist; skiplist = s; } else /* add s to list of nodes (q) to execute */ if (context != RF_INTR_CONTEXT) { /* we only have to enqueue if we're at intr context */ s->next = firelist; /* put node on a list to be fired after we unlock */ firelist = s; } else { /* enqueue the node for the dag exec thread to fire */ RF_ASSERT(NodeReady(s)); if (q) { q->next = s; q = s; } else { qh = q = s; qh->next = NULL; } } } } } if (q) { /* xfer our local list of nodes to the node queue */ q->next = raidPtr->node_queue; raidPtr->node_queue = qh; DO_SIGNAL(raidPtr); } DO_UNLOCK(raidPtr); for (; skiplist; skiplist = next) { next = skiplist->next; skiplist->status = rf_skipped; for (i = 0; i < skiplist->numAntecedents; i++) { skiplist->antecedents[i]->numSuccFired++; } if (skiplist->commitNode) { skiplist->dagHdr->numCommits++; } rf_FinishNode(skiplist, context); } for (; finishlist; finishlist = next) { /* NIL nodes: no need to fire them */ next = finishlist->next; finishlist->status = rf_good; for (i = 0; i < finishlist->numAntecedents; i++) { finishlist->antecedents[i]->numSuccFired++; } if (finishlist->commitNode) finishlist->dagHdr->numCommits++; /* * Okay, here we're calling rf_FinishNode() on nodes that * have the null function as their work proc. Such a node * could be the terminal node in a DAG. If so, it will * cause the DAG to complete, which will in turn free * memory used by the DAG, which includes the node in * question. Thus, we must avoid referencing the node * at all after calling rf_FinishNode() on it. */ rf_FinishNode(finishlist, context); /* recursive call */ } /* fire all nodes in firelist */ FireNodeList(firelist); #endif /* SIMULATE */ break; case rf_rollBackward : #ifdef SIMULATE #else /* SIMULATE */ for (i = 0; i < node->numAntecedents; i++) { a = *(node->antecedents + i); RF_ASSERT(a->status == rf_good); RF_ASSERT(a->numSuccDone <= a->numSuccedents); RF_ASSERT(a->numSuccDone <= a->numSuccFired); if (a->numSuccDone == a->numSuccFired) { if (a->undoFunc == rf_NullNodeFunc) { /* don't fire NIL nodes, just process them */ a->next = finishlist; finishlist = a; } else { if (context != RF_INTR_CONTEXT) { /* we only have to enqueue if we're at intr context */ a->next = firelist; /* put node on a list to be fired after we unlock */ firelist = a; } else { /* enqueue the node for the dag exec thread to fire */ RF_ASSERT(NodeReady(a)); if (q) { q->next = a; q = a; } else { qh = q = a; qh->next = NULL; } } } } } if (q) { /* xfer our local list of nodes to the node queue */ q->next = raidPtr->node_queue; raidPtr->node_queue = qh; DO_SIGNAL(raidPtr); } DO_UNLOCK(raidPtr); for (; finishlist; finishlist = next) { /* NIL nodes: no need to fire them */ next = finishlist->next; finishlist->status = rf_good; /* * Okay, here we're calling rf_FinishNode() on nodes that * have the null function as their work proc. Such a node * could be the first node in a DAG. If so, it will * cause the DAG to complete, which will in turn free * memory used by the DAG, which includes the node in * question. Thus, we must avoid referencing the node * at all after calling rf_FinishNode() on it. */ rf_FinishNode(finishlist, context); /* recursive call */ } /* fire all nodes in firelist */ FireNodeList(firelist); #endif /* SIMULATE */ break; default : printf("Engine found illegal DAG status in PropagateResults()\n"); RF_PANIC(); break; } } /* * Process a fired node which has completed */ static void ProcessNode( RF_DagNode_t *node, int context) { RF_Raid_t *raidPtr; int tid; raidPtr = node->dagHdr->raidPtr; switch (node->status) { case rf_good : /* normal case, don't need to do anything */ break; case rf_bad : if ((node->dagHdr->numCommits > 0) || (node->dagHdr->numCommitNodes == 0)) { node->dagHdr->status = rf_rollForward; /* crossed commit barrier */ if (rf_engineDebug || 1) { rf_get_threadid(tid); printf("[%d] node (%s) returned fail, rolling forward\n", tid, node->name); } } else { node->dagHdr->status = rf_rollBackward; /* never reached commit barrier */ if (rf_engineDebug || 1) { rf_get_threadid(tid); printf("[%d] node (%s) returned fail, rolling backward\n", tid, node->name); } } break; case rf_undone : /* normal rollBackward case, don't need to do anything */ break; case rf_panic : /* an undo node failed!!! */ printf("UNDO of a node failed!!!/n"); break; default : printf("node finished execution with an illegal status!!!\n"); RF_PANIC(); break; } #ifdef SIMULATE /* simulator fires nodes here. * user/kernel rely upon PropagateResults to do this. * XXX seems like this code should be merged so that the same thing happens for * both sim, user, and kernel. -wvcii */ switch (node->dagHdr->status) { case rf_enable : case rf_rollForward : if (node->numSuccedents == 0) { /* process terminal node */ if (rf_engineDebug) if (!DAGDone(node->dagHdr)) { rf_get_threadid(tid); printf("[%d] ProcessNode: !!!done but dag still in flight\n",tid); RF_PANIC(); } if (rf_engineDebug) printf("[%d] ProcessNode: !!!done will return true\n",tid); /* Mark dag as done */ (node->dagHdr)->done=RF_TRUE; raidPtr->dags_in_flight--; } else { PropagateResults(node, context); FireNodeArray(node->numSuccedents, node->succedents); } break; case rf_rollBackward : if (node->numAntecedents == 0) { /* reached head of dag, we're done */ if (rf_engineDebug) if (!DAGDone(node->dagHdr)) { rf_get_threadid(tid); printf("[%d] ProcessNode: !!!done but dag still in flight\n",tid); RF_PANIC(); } if (rf_engineDebug) printf("[%d] ProcessNode: !!!done will return true\n",tid); /* Mark dag as done */ (node->dagHdr)->done=RF_TRUE; raidPtr->dags_in_flight--; } else { PropagateResults(node, context); FireNodeArray(node->numAntecedents, node->antecedents); } break; default : RF_PANIC(); break; } #else /* SIMULATE */ /* enqueue node's succedents (antecedents if rollBackward) for execution */ PropagateResults(node, context); #endif /* SIMULATE */ } /* user context or dag-exec-thread context: * This is the first step in post-processing a newly-completed node. * This routine is called by each node execution function to mark the node * as complete and fire off any successors that have been enabled. */ int rf_FinishNode( RF_DagNode_t *node, int context) { /* as far as I can tell, retcode is not used -wvcii */ int retcode = RF_FALSE; node->dagHdr->numNodesCompleted++; ProcessNode(node, context); #ifdef SIMULATE if ((node->dagHdr)->done == RF_TRUE) retcode = RF_TRUE; #endif /* SIMULATE */ return(retcode); } /* user context: * submit dag for execution, return non-zero if we have to wait for completion. * if and only if we return non-zero, we'll cause cbFunc to get invoked with * cbArg when the DAG has completed. * * for now we always return 1. If the DAG does not cause any I/O, then the callback * may get invoked before DispatchDAG returns. There's code in state 5 of ContinueRaidAccess * to handle this. * * All we do here is fire the direct successors of the header node. The * DAG execution thread does the rest of the dag processing. */ int rf_DispatchDAG( RF_DagHeader_t *dag, void (*cbFunc)(void *), void *cbArg) { RF_Raid_t *raidPtr; int tid; raidPtr = dag->raidPtr; if (dag->tracerec) { RF_ETIMER_START(dag->tracerec->timer); } if (rf_engineDebug || rf_validateDAGDebug) { if (rf_ValidateDAG(dag)) RF_PANIC(); } if (rf_engineDebug) { rf_get_threadid(tid); printf("[%d] Entering DispatchDAG\n",tid); } raidPtr->dags_in_flight++; /* debug only: blow off proper locking */ dag->cbFunc = cbFunc; dag->cbArg = cbArg; dag->numNodesCompleted = 0; dag->status = rf_enable; FireNodeArray(dag->numSuccedents, dag->succedents); return(1); } /* dedicated kernel thread: * the thread that handles all DAG node firing. * To minimize locking and unlocking, we grab a copy of the entire node queue and then set the * node queue to NULL before doing any firing of nodes. This way we only have to release the * lock once. Of course, it's probably rare that there's more than one node in the queue at * any one time, but it sometimes happens. * * In the kernel, this thread runs at spl0 and is not swappable. I copied these * characteristics from the aio_completion_thread. */ #ifndef SIMULATE static void DAGExecutionThread(RF_ThreadArg_t arg) { RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; RF_Raid_t *raidPtr; int ks, tid; int s; #if !defined(__NetBSD__) && !defined(__OpenBSD__) RF_Thread_t thread; #endif raidPtr = (RF_Raid_t *)arg; rf_assign_threadid(); if (rf_engineDebug) { rf_get_threadid(tid); printf("[%d] Engine thread is running\n", tid); } #ifdef KERNEL #if !defined(__NetBSD__) && !defined(__OpenBSD__) thread = current_thread(); thread_swappable(thread, RF_FALSE); thread->priority = thread->sched_pri = BASEPRI_SYSTEM; s = spl0(); #endif /* XXX what to put here XXX */ s=splbio(); #endif /* KERNEL */ RF_THREADGROUP_RUNNING(&raidPtr->engine_tg); DO_LOCK(raidPtr); while (!raidPtr->shutdown_engine) { while (raidPtr->node_queue != NULL) { local_nq = raidPtr->node_queue; fire_nq = NULL; term_nq = NULL; raidPtr->node_queue = NULL; DO_UNLOCK(raidPtr); /* first, strip out the terminal nodes */ while (local_nq) { nd = local_nq; local_nq = local_nq->next; switch(nd->dagHdr->status) { case rf_enable : case rf_rollForward : if (nd->numSuccedents == 0) { /* end of the dag, add to callback list */ nd->next = term_nq; term_nq = nd; } else { /* not the end, add to the fire queue */ nd->next = fire_nq; fire_nq = nd; } break; case rf_rollBackward : if (nd->numAntecedents == 0) { /* end of the dag, add to the callback list */ nd->next = term_nq; term_nq = nd; } else { /* not the end, add to the fire queue */ nd->next = fire_nq; fire_nq = nd; } break; default : RF_PANIC(); break; } } /* execute callback of dags which have reached the terminal node */ while (term_nq) { nd = term_nq; term_nq = term_nq->next; nd->next = NULL; (nd->dagHdr->cbFunc)(nd->dagHdr->cbArg); raidPtr->dags_in_flight--; /* debug only */ } /* fire remaining nodes */ FireNodeList(fire_nq); DO_LOCK(raidPtr); } while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL) DO_WAIT(raidPtr); } DO_UNLOCK(raidPtr); RF_THREADGROUP_DONE(&raidPtr->engine_tg); #ifdef KERNEL #if defined(__NetBSD__) || defined(__OpenBSD__) splx(s); kthread_exit(0); #else splx(s); thread_terminate(thread); thread_halt_self(); #endif #endif /* KERNEL */ } #endif /* !SIMULATE */