// IfsRSMgrSwap.bcpl - Rendezvous Socket and Process Manager // Copyright Xerox Corporation 1979, 1980, 1981, 1982 // Last modified September 17, 1982 10:45 AM by Taft get "Pup.decl" get "IfsRs.decl" get "Ifs.decl" external [ // outgoing procedures HandleRSPup; DistributePBIs; DeclarePupSoc CreateJob; DestroyJob; ActiveJobs; JobOK FlushVitalState; FlushBuffersCtx // incoming procedures RSSendAbort; AdjustPBIs; PerformIdleTasks Allocate; Free; Enqueue; Dequeue; Unqueue; InsertAfter Zero; MoveBlock; Max; Min; MultEq; DefaultArgs; Block SetAllocation; ExchangePorts; CompletePup; AppendStringToPup ReleasePBI; OpenLevel1Socket; OpenRTPSocket; CreateBSPStream FlushKDs; FlushBTreeState; FlushBuffers; MarkDirtyVFiles FreePointer; InitializeContext; QueueEvent; CreateEvent CreateCtx; DestroyCtx DisableInterrupts; EnableInterrupts; Lock; Unlock; DoubleIncrement // outgoing statics entFlag; haltFlag; rsQ; jobT; ovPBIQ; numOverflowPBIs; maxOverflowPBIs maxJobs; rsLock; lenJobT; numPBIs; jpc // incoming statics ifsCtxQ; sysZone numNets; pbiFreeQ; lenPBI; CtxRunning; openLock ] static [ entFlag = true // Network connections are allowed haltFlag = false // Halt the system after all jobs are killed rsLock // Lock for CreateJob and DistributePBIs maxJobs // max # class 0 jobs (FTP+MTP+Telnet+Leaf+CopyDisk) jpc = 0 // job policy controls rsQ; jobT; numOverflowPBIs; maxOverflowPBIs; ovPBIQ lenJobT; numPBIs ] manifest jobSS = 1024 //---------------------------------------------------------------------------- let HandleRSPup(rs) be //---------------------------------------------------------------------------- [ let pbi = Dequeue(lv rs>>RS.soc>>PupSoc.iQ) if pbi>>PBI.pup.type ne typeRFC % pbi>>PBI.pup.dPort.host eq 0 then [ ReleasePBI(pbi); return ] unless entFlag do [ RSSendAbort(pbi, "IFS is not available"); return ] // Default net field of connection port if pbi>>PBI.pup.bytes^1 eq 0 then pbi>>PBI.pup.bytes^1 = pbi>>PBI.pup.sPort.net // Check jobT for duplicate entry for i = 0 to lenJobT-1 do if jobT>>JobT^i ne 0 then [ let soc = jobT>>JobT^i>>RSCtx.bspSoc if soc ne 0 & MultEq(lv soc>>RTPSoc.frnPort, lv pbi>>PBI.pup.words, lenPort) & MultEq(lv soc>>RTPSoc.connID, lv pbi>>PBI.pup.id) then [ SendRFC(pbi, soc); return ] //duplicate RFC ] // Get a stack and initialize the new job let ctx = CreateJob(rs>>RS.proc, rs>>RS.type, rs>>RS.extraSpace, true) if ctx eq 0 then [ RSSendAbort(pbi, "IFS is full - try later") DoubleIncrement(lv rs>>RS.numRefused) return ] let soc = Allocate(sysZone, lenBSPSoc) OpenLevel1Socket(soc, 0, lv pbi>>PBI.pup.words) ctx>>RSCtx.bspSoc = soc DistributePBIs() OpenRTPSocket(soc, 0, modeImmediateOpen, lv pbi>>PBI.pup.id) ctx>>RSCtx.connFlag = true CreateBSPStream(soc) SendRFC(pbi, soc) Enqueue(ifsCtxQ, ctx) DoubleIncrement(lv rs>>RS.numAccepted) ] //---------------------------------------------------------------------------- and SendRFC(pbi, soc) be //---------------------------------------------------------------------------- [ MoveBlock(lv pbi>>PBI.pup.words, lv soc>>PupSoc.lclPort, lenPort) ExchangePorts(pbi) CompletePup(pbi, typeRFC, pupOvBytes+6) ] //---------------------------------------------------------------------------- and DistributePBIs() be //---------------------------------------------------------------------------- // Attempts to distribute the system PBIs equally among all active jobs. // After doing its best, it checks two cases: // 1) When a new job is created, it may not be possible to equally // redistribute the PBIs by reducing the allocations of the other // jobs. When this happens, enough PBIs are allocated from sysZone // to prevent a deadlock. // 2) If some PBIs were allocated in the past to prevent a deadlock, and // they are no longer needed (either because there are fewer jobs now, // or because some jobs are not using all of their allocations), // pbiFreeQ is searched for the extra PBIs, and they are released if found. // This procedure assumes that it knows about all sockets that keep PBIs // for any length of time -- those that could participate in a deadlock. [ Lock(rsLock, true) let activeJobs = ActiveJobs() let committedPBIs = numNets //each net consumes 1 PBI permanently if activeJobs ne 0 then [ // The following ugly thing ensures that the SetAllocation procedure // is in core, so that later calls to SetAllocation won't fault and // let other processes sneak in and change the number of PBIs being used. let zPSIB = vec lenPSIB; Zero(zPSIB, lenPSIB) SetAllocation(zPSIB - offset PupSoc.psib/16, 0, 0, 0) // Each socket nominally gets its share of system PBIs, but at most 8: let inOutPBIs = Min((numPBIs-numNets)/activeJobs, 8) for i = 0 to lenJobT-1 do [ let ctx = jobT>>JobT^i if ctx ne 0 then [ let soc = ctx>>RSCtx.bspSoc //treated as a PupSoc if soc ne 0 then [ // Independently compute actual input and output socket PBI // allocations. Cannot decrease either allocation below // the number of PBIs actually in use plus one, unless the // socket is already using its maximum allocation. let iPBI = Max(Min(soc>>PupSoc.maxIPBI-soc>>PupSoc.numIPBI+1, soc>>PupSoc.maxIPBI), inOutPBIs) let oPBI = Max(Min(soc>>PupSoc.maxOPBI-soc>>PupSoc.numOPBI+1, soc>>PupSoc.maxOPBI), inOutPBIs) // Permit the socket one more than the max of its input and // output allocations to guarantee that one PBI can still // travel in the reverse direction. However, it is not // necessary to count this in committedPBIs, since the BSP // guarantees never to hold onto that extra PBI. (BSP uses // socket allocation minus one in both directions to control // BSP-level allocations.) let maxPBI = Max(iPBI, oPBI) SetAllocation(soc, maxPBI+1, iPBI, oPBI) committedPBIs = committedPBIs + maxPBI ] ] ] ] // If now overcommitted, allocate new PBIs and add to pool. // If now undercommitted, try to get rid of overflow PBIs. if committedPBIs gr numPBIs+numOverflowPBIs % committedPBIs ls numPBIs+numOverflowPBIs & numOverflowPBIs gr 0 then AdjustPBIs(committedPBIs) Unlock(rsLock) ] //---------------------------------------------------------------------------- and CreateJob(Proc, type, extraSpace, dontStart; numargs na) = valof //---------------------------------------------------------------------------- // Creates a job of type type and inserts it in the job table. // Starts the context unless dontStart is true (default false). // Returns a pointer to the context normally. // Returns zero if the job table is full or too many jobs of // the same type already exist or a stack can't be allocated. [ DefaultArgs(lv na, -2, lenRSCtx-3, false) let ctx = 0 Lock(rsLock, true) if JobOK(type, true) then for i = 0 to lenJobT-1 do if jobT>>JobT^i eq 0 then [ ctx = Allocate(sysZone, jobSS, true) if ctx ne 0 then [ Zero(ctx, extraSpace+3) InitializeContext(ctx, jobSS, Proc, extraSpace) ctx>>RSCtx.type = type jobT>>JobT^i = ctx unless dontStart do Enqueue(ifsCtxQ, ctx) ] break ] Unlock(rsLock) resultis ctx ] //---------------------------------------------------------------------------- and DestroyJob() be //---------------------------------------------------------------------------- // Procedure called by a job to terminate itself. Does not return. [ FreePointer(lv CtxRunning>>RSCtx.bspSoc) DistributePBIs() // The following depends on the fact that // (1) process stacks are page-size buffers, and freeing one doesn't // immediately clobber its contents; // (2) calling Unqueue and Free can't cause us to Block. for i = 0 to lenJobT-1 do if jobT>>JobT^i eq CtxRunning then jobT>>JobT^i = 0 Free(sysZone, CtxRunning) Unqueue(ifsCtxQ, CtxRunning) CtxRunning!0 = 0 // so CallContextList will start over -- CtxRunning has no successor! Block() // never to return ] //---------------------------------------------------------------------------- and ActiveJobs() = valof //---------------------------------------------------------------------------- [ let activeJobs = 0 for i = 0 to lenJobT-1 do if jobT>>JobT^i ne 0 then activeJobs = activeJobs+1 resultis activeJobs ] //---------------------------------------------------------------------------- and DeclarePupSoc(soc) be //---------------------------------------------------------------------------- // Procedure called by a process to declare a Pup socket it intends to use // that might demand some allocation. Pass soc = 0 to remove the current // socket (must do this before calling DestroyJob, as DestroyJob // will try to free the current socket into sysZone otherwise). [ CtxRunning>>RSCtx.bspSoc = soc DistributePBIs() ] //---------------------------------------------------------------------------- and JobOK(type, new; numargs na) = valof //---------------------------------------------------------------------------- // Returns true iff it is ok for a job of the specified type to run now. // new = true means you are talking about creating a new job of that type; // false (the default) means you are talking about continuing to run // an existing job. // See IfsRsMgrInit.bcpl for job policy controls currently in force. [ if haltFlag resultis false let class = jpc>>JPC.typeClass^type let totalJobs = na gr 1 & new? 1, 0 let numJobsInClass = totalJobs for i = 0 to lenJobT-1 do if jobT>>JobT^i ne 0 then [ totalJobs = totalJobs+1 if jpc>>JPC.typeClass^(jobT>>JobT^i>>RSCtx.type) eq class then numJobsInClass = numJobsInClass+1 ] resultis numJobsInClass le jpc>>JPC.classMax^class & totalJobs le jpc>>JPC.classSystemMax^class ] //---------------------------------------------------------------------------- and FlushVitalState(ecb) be //---------------------------------------------------------------------------- [ // Lock out changes in disk configuration while flushing disk state if Lock(openLock, true, true) then [ FlushKDs() MarkDirtyVFiles() FlushBTreeState() Unlock(openLock) ] DistributePBIs() // If there's nothing else to do.... if ActiveJobs() eq 0 then PerformIdleTasks() // Fork a separate context to do the actual VMem flush to disk, because it // can take a long time, and we don't want to tie up the EventMgr context. // n.b. Stack must be big enough for the FlushBuffers path through VMem plus // the deepest DOPAGEIO call path. All DOPAGEIO procedures besides OvDOPAGEIO // use CallProc to actually do the disk I/O; and OvDOPAGEIO isn't a problem // since overlays are never dirtied and never need to be written out. // Therefore a relatively small stack suffices. // Pass ecb to the FlushBuffersCtx and let it do the QueueEvent for the next // FlushVitalState when it is done -- so that there cannot possibly be multiple // instances of FlushBuffersCtx in existence simultaneously. CreateCtx(200, FlushBuffersCtx)!3 = ecb ] //---------------------------------------------------------------------------- and FlushBuffersCtx(ctx) be //---------------------------------------------------------------------------- [ let ecb = ctx!3 FlushBuffers() QueueEvent(ecb, 1500) // 15 seconds to next FlushVitalState DestroyCtx() ]