// PupBSPProt.bcpl -- Byte Stream Protocol
// companion files are PupBSPStreams.bcpl and PupBSPa.asm
// Copyright Xerox Corporation 1979, 1981

// Last modified November 21, 1981  10:49 AM by Taft

get "Pup.decl"
get "PupRTPInternal.decl"

external
[
//outgoing procedures
BSPPupProc; BSPTimerProc; SendAck; TransmitStrategy; SetTimeout; SearchTQ

//incoming procedures
BSPHandleUncommonPup; RTPFilter; RTPFSM
ReleasePBI; GetPBI; CompletePup; SetPupID
Zero; MultEq; DoubleDifference; DoubleIncrement
SetTimer; TimerHasExpired
Enqueue; Dequeue; Unqueue; InsertAfter; FlushQueue
EnableInterrupts; DisableInterrupts; Min; Max

//statics
maxPupDataBytes; overlapDataWithAck
]

// If overlapDataWithAck is true, TransmitStrategy will generate
// an AData before allocation is completely exhausted, thereby permitting
// continued sending during the AData/Ack round-trip time.
// This improves performance when the round-trip time is large.
// Unfortunately, it has disasterous effects on performance in certain
// situations because of lost packets caused by Ethernet interface
// turnaround latency in Altos and Novas.  We therefore do not enable
// this feature at the present time.

static [ overlapDataWithAck = false ]

//----------------------------------------------------------------------------
let BSPPupProc(pbi) be
//----------------------------------------------------------------------------
// Incoming Pups that are not RTP come here.  pbi is always disposed of,
// either here or by higher-level handler that we call.
[
let soc = pbi>>PBI.socket

// ignore pbi if socket's state is not Open, EndIn, or EndOut
if soc>>BSPSoc.state ls stateOpen % soc>>BSPSoc.state gr stateEndOut then
   [ ReleasePBI(pbi); return ]

switchon pbi>>PBI.pup.type into
   [
   case typeAMark:
   case typeAData:
      soc>>BSPSoc.ackPending = true
   case typeData:
   case typeMark:
      [
      // discard pbi unless all the following conditions are met:
      // from correct source port?
      unless RTPFilter(pbi, true, false) &

      // socket allocation sufficient?
       soc>>BSPSoc.numTPBI ne 0 & soc>>BSPSoc.numIPBI ne 0 &

      // pbi non-empty?
       pbi>>PBI.pup.length gr pupOvBytes &

      // all of pbi's bytes in allocation window?
       DoubleDifference(lv pbi>>PBI.pup.id, lv soc>>BSPSoc.rcvByteID) +
       pbi>>PBI.pup.length - (pupOvBytes+1) uls
       (soc>>BSPSoc.maxIPBI-1)*maxPupDataBytes &

      // non-duplicate?
       InsertInSequence(lv soc>>BSPSoc.bspIQ, pbi) do
         [ ReleasePBI(pbi); endcase ]  // discard

      while pbi ne 0 &
       MultEq(lv soc>>BSPSoc.rcvByteID, lv pbi>>PBI.pup.id) do
         [  // Filled in first hole, advance rcvByteID
         DoubleIncrement(lv soc>>BSPSoc.rcvByteID,
          pbi>>PBI.pup.length-pupOvBytes)
         soc>>BSPSoc.unReadPups = soc>>BSPSoc.unReadPups+1
         pbi = pbi!0
         ]

      SetTimer(lv soc>>BSPSoc.inactivityTimer, inactivityTimeout)
      endcase
      ]

   case typeAck:
      [
      unless RTPFilter(pbi, true, false) &
       DoubleDifference(lv pbi>>PBI.pup.id, lv soc>>BSPSoc.lastAckID) ge 0 do
         [ ReleasePBI(pbi); endcase ]  // wrong source or out of order

      if soc>>BSPSoc.ackRequested then
         [  // Update the adaptive timeout = 2 times the average round-trip
            // delay, exponentially aged over the last 8 samples.
         let t = nil; SetTimer(lv t, 0)
         compiletest alto
            ifso [ t = (t-soc>>BSPSoc.lastADataTime) lshift 3 +4 ]
            ifnot [ t = (t-soc>>BSPSoc.lastADataTime) lshift 1 +1 ]
         soc>>BSPSoc.aDataTimeout =
          (7*soc>>BSPSoc.aDataTimeout+Max(5, Min(1000, t))) rshift 3
         ]

      soc>>BSPSoc.lastAckID↑1 = pbi>>PBI.pup.id↑1
      soc>>BSPSoc.lastAckID↑2 = pbi>>PBI.pup.id↑2

// BSPPupProc (cont'd)

      // move all PBIs from bspTQ to a private queue
      let tQ = vec 1
      DisableInterrupts()
      tQ!0 = soc>>BSPSoc.bspTQ.head; tQ!1 = soc>>BSPSoc.bspTQ.tail
      soc>>BSPSoc.bspTQ.head = 0
      EnableInterrupts()

      // decide whether to release, keep, or retransmit each pbi
      let rTQ = vec 1; rTQ!0 = 0  // q of pbis to retransmit
         [ // repeat
         let npbi = Dequeue(tQ); if npbi eq 0 break
         if npbi>>PBI.pup.type eq typeInterrupt then
            [ CompletePup(npbi); loop ]  // Retransmit interrupts
         test DoubleDifference(lv soc>>BSPSoc.lastAckID,
          lv npbi>>PBI.pup.id) ge npbi>>PBI.pup.length-pupOvBytes
            ifso
               [  // This pbi has been acked, release it
               soc>>BSPSoc.unAckedPups = soc>>BSPSoc.unAckedPups-1
               soc>>BSPSoc.unAckedBytes = soc>>BSPSoc.unAckedBytes-
                  (npbi>>PBI.pup.length-pupOvBytes)
               ReleasePBI(npbi)
               soc>>BSPSoc.pupAllocCount = soc>>BSPSoc.pupAllocCount+1
               ]
            ifnot
               [  // pbi not yet acked, decide whether to retransmit
               test soc>>BSPSoc.aDataCount-npbi>>PBI.timer gr 0 &
                soc>>BSPSoc.ackRequested
                  ifso  // Yes, retain for retransmission
                     [
                     TransmitStrategy(npbi, false)
                     InsertInSequence(rTQ, npbi) // can't be duplicate
                     ]
                  ifnot  // No, just put back on bspTQ
                     Enqueue(lv soc>>BSPSoc.bspTQ, npbi)
               ]
         ] repeat

      // Update allocations
      soc>>BSPSoc.ackRequested = false
      if soc>>BSPSoc.pupAllocCount ge pupAllocHysteresis then
         [  // Time to attempt to increase maxPupAlloc
         soc>>BSPSoc.pupAllocCount = 0
         soc>>BSPSoc.maxPupAlloc =
          Min(soc>>BSPSoc.maxPupAlloc+1, soc>>BSPSoc.maxOPBI)
         ]
      soc>>BSPSoc.bytesPerPup = Min(pbi>>PBI.pup.words↑1, maxPupDataBytes)
      soc>>BSPSoc.pupAlloc = Min(soc>>BSPSoc.maxPupAlloc,
       pbi>>PBI.pup.words↑2 - soc>>BSPSoc.unAckedPups)
      soc>>BSPSoc.byteAlloc = pbi>>PBI.pup.words↑3 - soc>>BSPSoc.unAckedBytes
      ReleasePBI(pbi)

      // if we have any PBIs to retransmit, do so now
      if rTQ!0 ne 0 then
         [
         TransmitStrategy(rTQ!1)  // Maybe turn tail into AData
         while rTQ!0 ne 0 do CompletePup(Dequeue(rTQ))
         ]

      SetTimeout(soc)
      SetTimer(lv soc>>BSPSoc.inactivityTimer, inactivityTimeout)
      endcase
      ]

// BSPPupProc (cont'd)

   case typeInterrupt:
   case typeInterruptReply:
   case typeEnd:
      BSPHandleUncommonPup(soc, pbi)  // it always disposes of pbi
      endcase

   case typeError:
      if pbi>>PBI.pup.words↑11 eq 3 %  // port IQ overflow
       pbi>>PBI.pup.words↑11 eq #1007 then  // gateway OQ overflow
         [
         // Got a congestion error.  Attempt to throttle back output
         // by decreasing the maximum outgoing Pup allocation.
         soc>>BSPSoc.maxPupAlloc = Max(soc>>BSPSoc.maxPupAlloc-1, 1)
         soc>>BSPSoc.pupAllocCount = 0
         ]
      // let higher-level handler look at it also

   default:
      // non-BSP Pups get passed to higher-level handler, which always
      // disposes of pbi
      (soc>>BSPSoc.bspOtherPupProc)(pbi)
   ]

if soc>>BSPSoc.ackPending then SendAck(soc)
]

//----------------------------------------------------------------------------
and InsertInSequence(queue, pbi) = valof
//----------------------------------------------------------------------------
// Inserts pbi into the proper place in queue to maintain ascending order of
// Pup IDs.  Queue must already be in order.
// Returns true normally; false if the pbi's ID duplicates one already in
// the queue.
[
// Test first for the most common case of the new pbi going on the end of
// the queue.  If it doesn't belong there, then initiate a search from the
// front of the queue for the right place to insert it.
test queue!0 eq 0 %
 DoubleDifference(lv pbi>>PBI.pup.id, lv (queue!1)>>PBI.pup.id) gr 0
   ifso Enqueue(queue, pbi)
   ifnot
      [ // Search queue for right place to insert new pbi
      let pred = queue
      let d = nil

      // assert: the queue is non-empty and pbi's ID <= the tail item's ID.
      // Therefore this loop is guaranteed to terminate.
         [ // repeat
         d = DoubleDifference(lv pbi>>PBI.pup.id, lv (pred!0)>>PBI.pup.id)
         if d le 0 then break
         pred = pred!0
         ] repeat

      if d eq 0 then resultis false  // duplicate
      InsertAfter(queue, pred, pbi)
      ]

resultis true
]

//----------------------------------------------------------------------------
and BSPTimerProc(soc) be
//----------------------------------------------------------------------------
// Called whenever the socket bspTimer expires
[
// do nothing if socket's state is not Open, EndIn, or EndOut
unless soc>>BSPSoc.state ls stateOpen % soc>>BSPSoc.state gr stateEndOut do
   [
   unless soc>>BSPSoc.noInactivityTimeout do
      if TimerHasExpired(lv soc>>BSPSoc.inactivityTimer) then
         [ RTPFSM(soc, CLST, 0); return ]  // Connection has died, abort it

   if soc>>BSPSoc.ackPending then SendAck(soc)
   if soc>>BSPSoc.interruptOut then
      [
      let pbi = SearchTQ(soc, true)  // Search for outstanding interrupt
      if pbi ne 0 then CompletePup(pbi)  // Retransmit if found one
      ]

   // Generate AData probe unconditionally.  If we have no data to move,
   // probe once every 15 seconds to ensure that the connection is alive.
   let pbi = GetPBI(soc, true)
   test pbi ne 0
      ifso
         [  // Probe with zero-length AData if can get pbi
         unless soc>>BSPSoc.ackRequested do
            SetTimer(lv soc>>BSPSoc.lastADataTime, 0)
         soc>>BSPSoc.ackRequested = true
         soc>>BSPSoc.aDataCount = soc>>BSPSoc.aDataCount+1
         SetPupID(pbi, lv soc>>BSPSoc.xmitByteID)
         CompletePup(pbi, typeAData, pupOvBytes)
         ]
      ifnot
         [  // Probe with a pbi on the TQ otherwise
         pbi = SearchTQ(soc, false)  // Look for a non-interrupt
         if pbi ne 0 then
            [ TransmitStrategy(pbi, true); CompletePup(pbi) ]
         ]
   ]

SetTimeout(soc)
]

//----------------------------------------------------------------------------
and SendAck(soc) be
//----------------------------------------------------------------------------
[
let pbi = GetPBI(soc, true); if pbi ne 0 then
   [
   soc>>BSPSoc.ackPending = false
   SetPupID(pbi, lv soc>>BSPSoc.rcvByteID)
   pbi>>PBI.pup.words↑1 = maxPupDataBytes  // Bytes per pup
   pbi>>PBI.pup.words↑2 =  // Pup allocation
      soc>>BSPSoc.maxIPBI-1-soc>>BSPSoc.unReadPups
   pbi>>PBI.pup.words↑3 =  // Byte allocation
      maxPupDataBytes*pbi>>PBI.pup.words↑2
   soc>>BSPSoc.sentZeroAlloc = pbi>>PBI.pup.words↑2 eq 0
   CompletePup(pbi, typeAck, pupOvBytes+6)

   // Flush received data packets we aren't acknowledging, since
   // we know they will be retransmitted and we don't want the
   // retransmissions to overflow our socket IQ if we are slow
   // in processing them.
   while soc>>BSPSoc.bspIQ.head ne 0 do
      [
      pbi = soc>>BSPSoc.bspIQ.tail
      if DoubleDifference(lv pbi>>PBI.pup.id,
       lv soc>>BSPSoc.rcvByteID) ls 0 then break
      Unqueue(lv soc>>BSPSoc.bspIQ, pbi)
      ReleasePBI(pbi)
      ]
   ]
]

//----------------------------------------------------------------------------
and SearchTQ(soc, lookForInterrupt) = valof
//----------------------------------------------------------------------------
// Unqueues and returns the last pbi on the BSP TQ, looking for
// an Interrupt if lookForInterrupt is true, anything else if false.
// We return the last pbi (rather than the first) so that retransmitting it
// leaves the order of pbis on the TQ unchanged.  Returns zero if none found.
[
let rpbi = 0
let pbi = soc>>BSPSoc.bspTQ.head
while pbi ne 0 do
   [
   if (pbi>>PBI.pup.type eq typeInterrupt) eqv lookForInterrupt then
      [ rpbi = pbi; break ]
   pbi = pbi!0
   ]
if rpbi ne 0 then Unqueue(lv soc>>BSPSoc.bspTQ, rpbi)
resultis rpbi
]

//----------------------------------------------------------------------------
and TransmitStrategy(pbi, makeA; numargs na) be
//----------------------------------------------------------------------------
// Diddles type depending on makeA or allocation remaining.
// Current algorithm is that we send AData when allocation
// falls below 33% of that given in the last received Ack.
// This may be expressed as:
//   pupAlloc le k * (pupAlloc + unAckedPups)
// where k = 0.33 and it is assumed that pupAlloc+unAckedPups
// gives the original Pup allocation.  This is equivalent to
//   (1-k)/k * pupAlloc le unAckedPups.
// For k = 0.33, (1-k)/k = 2.  This is the derivation of the
// comparison "pupAlloc lshift 1 le soc>>BSPSoc.unAckedPups" below.
[
let soc = pbi>>PBI.socket
unless na gr 1 & makeA do
   test soc>>BSPSoc.ackRequested
      ifso makeA = false
      ifnot
         [
         let pupAlloc = Min(Min(Min(soc>>BSPSoc.pupAlloc,
            soc>>BSPSoc.byteAlloc/soc>>BSPSoc.bytesPerPup),
            soc>>BSPSoc.numOPBI-1), soc>>BSPSoc.numTPBI-1)
         makeA = (overlapDataWithAck?
            pupAlloc lshift 1 le soc>>BSPSoc.unAckedPups,
            pupAlloc le 0)
         ]
if makeA then
   [
   // Unless an AData is already outstanding, start measuring
   // the round-trip delay for computing the adaptive timeout.
   unless soc>>BSPSoc.ackRequested do
      SetTimer(lv soc>>BSPSoc.lastADataTime, 0)
   soc>>BSPSoc.ackRequested = true
   soc>>BSPSoc.aDataCount = soc>>BSPSoc.aDataCount+1
   ]
pbi>>PBI.pup.type = selecton pbi>>PBI.pup.type into
   [
   case typeData:
   case typeAData: makeA? typeAData, typeData
   case typeMark:
   case typeAMark: makeA? typeAMark, typeMark
   ]
]

//----------------------------------------------------------------------------
and SetTimeout(soc) be
//----------------------------------------------------------------------------
SetTimer(lv soc>>BSPSoc.bspTimer, (soc>>BSPSoc.interruptOut %
 soc>>BSPSoc.unAckedPups gr 0 % soc>>BSPSoc.pupAlloc le 0 %
 soc>>BSPSoc.byteAlloc le 0 % soc>>BSPSoc.bytesPerPup le 0) ?
  (soc>>BSPSoc.ackRequested? soc>>BSPSoc.aDataTimeout,
   Max(soc>>BSPSoc.aDataTimeout, outstandingDataTimeout)),
  idleTimeout)