// IfsRSMgrSwap.bcpl - Rendezvous Socket and Process Manager
// Copyright Xerox Corporation 1979, 1980, 1981, 1982
// Last modified September 17, 1982  10:45 AM by Taft

get "Pup.decl"
get "IfsRs.decl"
get "Ifs.decl"

external
[
// outgoing procedures
HandleRSPup; DistributePBIs; DeclarePupSoc
CreateJob; DestroyJob; ActiveJobs; JobOK
FlushVitalState; FlushBuffersCtx

// incoming procedures
RSSendAbort; AdjustPBIs; PerformIdleTasks
Allocate; Free; Enqueue; Dequeue; Unqueue; InsertAfter
Zero; MoveBlock; Max; Min; MultEq; DefaultArgs; Block
SetAllocation; ExchangePorts; CompletePup; AppendStringToPup
ReleasePBI; OpenLevel1Socket; OpenRTPSocket; CreateBSPStream
FlushKDs; FlushBTreeState; FlushBuffers; MarkDirtyVFiles
FreePointer; InitializeContext; QueueEvent; CreateEvent
CreateCtx; DestroyCtx
DisableInterrupts; EnableInterrupts; Lock; Unlock; DoubleIncrement

// outgoing statics
entFlag; haltFlag; rsQ; jobT; ovPBIQ; numOverflowPBIs; maxOverflowPBIs
maxJobs; rsLock; lenJobT; numPBIs; jpc

// incoming statics
ifsCtxQ; sysZone
numNets; pbiFreeQ; lenPBI; CtxRunning; openLock
]

static
[
entFlag = true		// Network connections are allowed
haltFlag = false	// Halt the system after all jobs are killed
rsLock			// Lock for CreateJob and DistributePBIs
maxJobs			// max # class 0 jobs (FTP+MTP+Telnet+Leaf+CopyDisk)
jpc = 0			// job policy controls
rsQ; jobT; numOverflowPBIs; maxOverflowPBIs; ovPBIQ
lenJobT; numPBIs
]

manifest jobSS = 1024

//----------------------------------------------------------------------------
let HandleRSPup(rs) be
//----------------------------------------------------------------------------
[
let pbi = Dequeue(lv rs>>RS.soc>>PupSoc.iQ)
if pbi>>PBI.pup.type ne typeRFC % pbi>>PBI.pup.dPort.host eq 0 then
   [ ReleasePBI(pbi); return ]
unless entFlag do
   [ RSSendAbort(pbi, "IFS is not available"); return ]

// Default net field of connection port
if pbi>>PBI.pup.bytes↑1 eq 0 then
   pbi>>PBI.pup.bytes↑1 = pbi>>PBI.pup.sPort.net

// Check jobT for duplicate entry
for i = 0 to lenJobT-1 do
   if jobT>>JobT↑i ne 0 then
      [
      let soc = jobT>>JobT↑i>>RSCtx.bspSoc
      if soc ne 0 &
       MultEq(lv soc>>RTPSoc.frnPort, lv pbi>>PBI.pup.words, lenPort) &
       MultEq(lv soc>>RTPSoc.connID, lv pbi>>PBI.pup.id) then
         [ SendRFC(pbi, soc); return ]  //duplicate RFC
      ]

// Get a stack and initialize the new job
let ctx = CreateJob(rs>>RS.proc, rs>>RS.type, rs>>RS.extraSpace, true)
if ctx eq 0 then
   [
   RSSendAbort(pbi, "IFS is full - try later")
   DoubleIncrement(lv rs>>RS.numRefused)
   return
   ]
let soc = Allocate(sysZone, lenBSPSoc)
OpenLevel1Socket(soc, 0, lv pbi>>PBI.pup.words)
ctx>>RSCtx.bspSoc = soc
DistributePBIs()
OpenRTPSocket(soc, 0, modeImmediateOpen, lv pbi>>PBI.pup.id)
ctx>>RSCtx.connFlag = true
CreateBSPStream(soc)
SendRFC(pbi, soc)
Enqueue(ifsCtxQ, ctx)
DoubleIncrement(lv rs>>RS.numAccepted)
]

//----------------------------------------------------------------------------
and SendRFC(pbi, soc) be
//----------------------------------------------------------------------------
[
MoveBlock(lv pbi>>PBI.pup.words, lv soc>>PupSoc.lclPort, lenPort)
ExchangePorts(pbi)
CompletePup(pbi, typeRFC, pupOvBytes+6)
]

//----------------------------------------------------------------------------
and DistributePBIs() be
//----------------------------------------------------------------------------
// Attempts to distribute the system PBIs equally among all active jobs.
// After doing its best, it checks two cases:
// 1) When a new job is created, it may not be possible to equally
//  redistribute the PBIs by reducing the allocations of the other
//  jobs.  When this happens, enough PBIs are allocated from sysZone
//  to prevent a deadlock.
// 2) If some PBIs were allocated in the past to prevent a deadlock, and
//  they are no longer needed (either because there are fewer jobs now,
//  or because some jobs are not using all of their allocations),
//  pbiFreeQ is searched for the extra PBIs, and they are released if found.
// This procedure assumes that it knows about all sockets that keep PBIs
//  for any length of time -- those that could participate in a deadlock.
[
Lock(rsLock, true)
let activeJobs = ActiveJobs()
let committedPBIs = numNets  //each net consumes 1 PBI permanently
if activeJobs ne 0 then
   [
   // The following ugly thing ensures that the SetAllocation procedure
   //  is in core, so that later calls to SetAllocation won't fault and
   //  let other processes sneak in and change the number of PBIs being used.
   let zPSIB = vec lenPSIB; Zero(zPSIB, lenPSIB)
   SetAllocation(zPSIB - offset PupSoc.psib/16, 0, 0, 0)

   // Each socket nominally gets its share of system PBIs, but at most 8:
   let inOutPBIs = Min((numPBIs-numNets)/activeJobs, 8)
   for i = 0 to lenJobT-1 do
      [
      let ctx = jobT>>JobT↑i
      if ctx ne 0 then
         [
         let soc = ctx>>RSCtx.bspSoc  //treated as a PupSoc
         if soc ne 0 then
            [
            // Independently compute actual input and output socket PBI
            // allocations.  Cannot decrease either allocation below
            // the number of PBIs actually in use plus one, unless the
            // socket is already using its maximum allocation.
            let iPBI = Max(Min(soc>>PupSoc.maxIPBI-soc>>PupSoc.numIPBI+1,
             soc>>PupSoc.maxIPBI), inOutPBIs)
            let oPBI = Max(Min(soc>>PupSoc.maxOPBI-soc>>PupSoc.numOPBI+1,
             soc>>PupSoc.maxOPBI), inOutPBIs)

            // Permit the socket one more than the max of its input and
            // output allocations to guarantee that one PBI can still
            // travel in the reverse direction.  However, it is not
            // necessary to count this in committedPBIs, since the BSP
            // guarantees never to hold onto that extra PBI.  (BSP uses
            // socket allocation minus one in both directions to control
            // BSP-level allocations.)
            let maxPBI = Max(iPBI, oPBI)
            SetAllocation(soc, maxPBI+1, iPBI, oPBI)
            committedPBIs = committedPBIs + maxPBI
            ]
         ]
      ]
   ]

// If now overcommitted, allocate new PBIs and add to pool.
// If now undercommitted, try to get rid of overflow PBIs.
if committedPBIs gr numPBIs+numOverflowPBIs %
 committedPBIs ls numPBIs+numOverflowPBIs & numOverflowPBIs gr 0 then
   AdjustPBIs(committedPBIs)

Unlock(rsLock)
]

//----------------------------------------------------------------------------
and CreateJob(Proc, type, extraSpace, dontStart; numargs na) = valof
//----------------------------------------------------------------------------
// Creates a job of type type and inserts it in the job table.
// Starts the context unless dontStart is true (default false).
// Returns a pointer to the context normally.
// Returns zero if the job table is full or too many jobs of
// the same type already exist or a stack can't be allocated.
[
DefaultArgs(lv na, -2, lenRSCtx-3, false)
let ctx = 0
Lock(rsLock, true)
if JobOK(type, true) then
   for i = 0 to lenJobT-1 do
      if jobT>>JobT↑i eq 0 then
         [
         ctx = Allocate(sysZone, jobSS, true)
         if ctx ne 0 then
            [
            Zero(ctx, extraSpace+3)
            InitializeContext(ctx, jobSS, Proc, extraSpace)
            ctx>>RSCtx.type = type
            jobT>>JobT↑i = ctx
            unless dontStart do Enqueue(ifsCtxQ, ctx)
            ]
         break
         ]
Unlock(rsLock)
resultis ctx
]

//----------------------------------------------------------------------------
and DestroyJob() be
//----------------------------------------------------------------------------
// Procedure called by a job to terminate itself.  Does not return.
[
FreePointer(lv CtxRunning>>RSCtx.bspSoc)
DistributePBIs()

// The following depends on the fact that
// (1) process stacks are page-size buffers, and freeing one doesn't
//     immediately clobber its contents;
// (2) calling Unqueue and Free can't cause us to Block.
for i = 0 to lenJobT-1 do
   if jobT>>JobT↑i eq CtxRunning then jobT>>JobT↑i = 0
Free(sysZone, CtxRunning)
Unqueue(ifsCtxQ, CtxRunning)
CtxRunning!0 = 0  // so CallContextList will start over -- CtxRunning has no successor!
Block()  // never to return
]

//----------------------------------------------------------------------------
and ActiveJobs() = valof
//----------------------------------------------------------------------------
[
let activeJobs = 0
for i = 0 to lenJobT-1 do if jobT>>JobT↑i ne 0 then
   activeJobs = activeJobs+1
resultis activeJobs
]

//----------------------------------------------------------------------------
and DeclarePupSoc(soc) be
//----------------------------------------------------------------------------
// Procedure called by a process to declare a Pup socket it intends to use
// that might demand some allocation.  Pass soc = 0 to remove the current
// socket (must do this before calling DestroyJob, as DestroyJob
// will try to free the current socket into sysZone otherwise).
[
CtxRunning>>RSCtx.bspSoc = soc
DistributePBIs()
]

//----------------------------------------------------------------------------
and JobOK(type, new; numargs na) = valof
//----------------------------------------------------------------------------
// Returns true iff it is ok for a job of the specified type to run now.
// new = true means you are talking about creating a new job of that type;
// false (the default) means you are talking about continuing to run
// an existing job.
// See IfsRsMgrInit.bcpl for job policy controls currently in force.
[
if haltFlag resultis false

let class = jpc>>JPC.typeClass↑type
let totalJobs = na gr 1 & new? 1, 0
let numJobsInClass = totalJobs
for i = 0 to lenJobT-1 do
   if jobT>>JobT↑i ne 0 then
      [
      totalJobs = totalJobs+1
      if jpc>>JPC.typeClass↑(jobT>>JobT↑i>>RSCtx.type) eq class then
         numJobsInClass = numJobsInClass+1
      ]
resultis numJobsInClass le jpc>>JPC.classMax↑class &
 totalJobs le jpc>>JPC.classSystemMax↑class
]

//----------------------------------------------------------------------------
and FlushVitalState(ecb) be
//----------------------------------------------------------------------------
[
// Lock out changes in disk configuration while flushing disk state
if Lock(openLock, true, true) then
   [
   FlushKDs()
   MarkDirtyVFiles()
   FlushBTreeState()
   Unlock(openLock)
   ]

DistributePBIs()

// If there's nothing else to do....
if ActiveJobs() eq 0 then PerformIdleTasks()

// Fork a separate context to do the actual VMem flush to disk, because it
// can take a long time, and we don't want to tie up the EventMgr context.
// n.b. Stack must be big enough for the FlushBuffers path through VMem plus
// the deepest DOPAGEIO call path.  All DOPAGEIO procedures besides OvDOPAGEIO
// use CallProc to actually do the disk I/O; and OvDOPAGEIO isn't a problem
// since overlays are never dirtied and never need to be written out.
// Therefore a relatively small stack suffices.
// Pass ecb to the FlushBuffersCtx and let it do the QueueEvent for the next
// FlushVitalState when it is done -- so that there cannot possibly be multiple
// instances of FlushBuffersCtx in existence simultaneously.
CreateCtx(200, FlushBuffersCtx)!3 = ecb
]

//----------------------------------------------------------------------------
and FlushBuffersCtx(ctx) be
//----------------------------------------------------------------------------
[
let ecb = ctx!3
FlushBuffers()
QueueEvent(ecb, 1500)  // 15 seconds to next FlushVitalState
DestroyCtx()
]