-- File: PupPktHot.mesa,  Last Edit: HGM  March 14, 1981  11:40 AM

DIRECTORY
  Inline USING [LowHalf],
  Process USING [SetTimeout, MsecToTicks],
  System USING [Pulses, GetClockPulses],
  StatsDefs USING [StatBump, StatIncr],
  CommFlags USING [doDebug, doStats],
  DriverDefs USING [MaybeGetFreePupBuffer, Glitch],
  PupStream USING [StreamClosing],
  PupPktDefs,
  PupPktOps,
  PupDefs USING [
    Byte, PupBuffer, DequeuePup, EnqueuePup, GetFreePupBuffer,
    ReturnFreePupBuffer, PupRouterSendThis],
  BufferDefs USING [ReturnFreeBuffer],
  PupTypes USING [PupAddress, maxDataBytesPerGatewayPup];

PupPktHot: MONITOR RETURNS [POINTER TO PupPktOps.InstanceData]LOCKS mine.lock
  IMPORTS
    Inline, Process, System, StatsDefs, PupStream, DriverDefs, PupPktOps, PupDefs,
    BufferDefs
  EXPORTS PupPktOps =
  BEGIN OPEN StatsDefs, DriverDefs, PupPktOps, PupDefs;

  mine: PupPktOps.InstanceData;

  NeitherDataNorMark: PUBLIC ERROR = CODE;
  BufferAlreadyRequeued: PUBLIC ERROR = CODE;
  StreamNotOpen: PUBLIC ERROR = CODE;

  -- Only called by InputPacket
  Diff: PROCEDURE [a, b: LONG INTEGER] RETURNS [INTEGER] =
    BEGIN
    maxInteger: INTEGER = 77777B;
    temp: LONG INTEGER ← a - b;
    SELECT TRUE FROM
      temp > maxInteger => RETURN[maxInteger];
      temp < -maxInteger => RETURN[-maxInteger];
      ENDCASE => RETURN[Inline.LowHalf[temp]];
    END;

  Retransmitter: ENTRY PROCEDURE =
    -- Retransmit things which have not been acknowledged in a reasonable time.
    -- Such things include RFCs and ENDs as well as data.
    BEGIN OPEN mine;
    now: System.Pulses;
    UNTIL pleaseDie DO
      now ← System.GetClockPulses[];
      SELECT state FROM
	open =>
	  BEGIN
	  IF now - timer > pingPulses AND ping THEN
	    BEGIN
	    probeCounter ← pingRetransmissions;
	    allocatedPups ← 0; -- will start probing

	    END;
	  IF now - timer > ctlRetransmitPulses AND
	    (outEnd = 0 OR allocatedPups = 0) THEN ProbeForAck[];
	  END;
	talking, finishing =>
	  BEGIN
	  DO
	    -- recycle things that have timed out
	    IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[@sentQueue];
	    IF sentBuffer = NIL THEN EXIT;
	    IF ackedID - Flip[sentBuffer.pupID] > 0 THEN
	      BEGIN -- this packet has been ACKed already
	      IF (unackedPups ← unackedPups - 1) = 0 THEN
		BEGIN
		SELECT state FROM -- last packet has been ACKed

		  talking => state ← open;
		  finishing => state ← end;
		  ENDCASE;
		BROADCAST stateChange;
		END;
	      ReturnFreePupBuffer[sentBuffer];
	      sentBuffer ← NIL;
	      END
	    ELSE EXIT;
	    ENDLOOP;
	  IF sentBuffer # NIL AND now - timer > retransmitPulses THEN
	    BEGIN
	    ProbeForAck[];
	    IF now - timer > retransmitPulses THEN
	      BEGIN -- couldn't get buffer, use one of ours
	      IF sentBuffer.pupType = data THEN sentBuffer.pupType ← aData;
	      timer ← System.GetClockPulses[];
	      PupRouterSendThis[sentBuffer];
	      IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted];
	      sentBuffer ← DequeuePup[@sentQueue];
	      END;
	    END;
	  END;
	halfOpen => IF now - timer > ctlRetransmitPulses THEN SendRfc[@mine];
	end => IF now - timer > ctlRetransmitPulses THEN SendEnd[@mine];
	closed =>
	  BEGIN
	  DO
	    -- flush anything left on the sentQueue
	    IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[@sentQueue];
	    IF sentBuffer = NIL THEN EXIT;
	    unackedPups ← unackedPups - 1;
	    ReturnFreePupBuffer[sentBuffer];
	    sentBuffer ← NIL;
	    ENDLOOP;
	  END;
	ENDCASE;
      IF outIntPending AND now - outIntTime > ctlRetransmitPulses THEN
	SendInt[@mine];
      WAIT retransmitterReady;
      ENDLOOP;
    UNTIL unackedPups = 0 DO
      DO
	-- flush anything left on the sentQueue
	IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[@sentQueue];
	IF sentBuffer = NIL THEN EXIT;
	unackedPups ← unackedPups - 1;
	ReturnFreePupBuffer[sentBuffer];
	sentBuffer ← NIL;
	ENDLOOP;
      -- If a buffer is on a device output queue, wait until it comes back.
      IF unackedPups # 0 THEN WAIT retransmitterReady;
      ENDLOOP;
    END;


  ProbeForAck: INTERNAL PROCEDURE =
    BEGIN OPEN mine;
    b: PupBuffer;
    IF (b ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
    b.pupLength ← bytesPerPupHeader;
    b.pupType ← aData;
    b.pupID ← Flop[nextOutputID];
    b.source ← local;
    b.dest ← remote;
    timer ← System.GetClockPulses[];
    aDataOut ← FALSE;
    clumpsSinceBump ← 0;
    PupRouterSendThis[b];
    IF CommFlags.doStats THEN StatIncr[statProbesSent];
    IF (probeCounter ← probeCounter + 1) > retransmitionsBeforeAbort THEN
      SmashClosed[@mine, transmissionTimeout];
    IF probeCounter > probesBeforePanic THEN
      retransmitPulses ←
	[MIN[System.Pulses[2*retransmitPulses], maxRetransmitPulses]];
    END;

  ThrottleForward: PROCEDURE =
    BEGIN OPEN mine;
    old: CARDINAL ← pathMaxAllocate;
    clumpsSinceBump ← 0;
    -- We can actually get one packet ahead at this point.
    IF retransmitPulses = maxRetransmitPulses THEN RETURN;
    pathMaxAllocate ← MIN[pathMaxAllocate + 1, myMaxAllocate];
    retransmitPulses ← [(retransmitPulses*pathMaxAllocate)/old];
    END;

  ThrottleBack: INTERNAL PROCEDURE =
    BEGIN OPEN mine;
    old: CARDINAL;
    IF pathMaxAllocate = 1 THEN
      BEGIN -- This is a desperate attempt to avoid an instability
      -- It is/(was?) also a nasty bug under some strange case that Andrew found
      pause: CONDITION;
      Process.SetTimeout[@pause, Process.MsecToTicks[maxRetransmitTime]];
      WAIT pause;
      END;
    UNTIL throttle = 0 OR retransmitPulses = minRetransmitPulses OR
      pathMaxAllocate = 1 DO
      old ← pathMaxAllocate;
      pathMaxAllocate ← pathMaxAllocate - 1;
      -- Beware of rounding down, assume return ack takes as long as a send packet
      -- This goes unstable if much of the time is due to somebody else's packets
      --    retransmitPulses ← ((retransmitPulses+old)*old)/(old+1);
      throttle ← throttle - 1;
      ENDLOOP;
    clumpsSinceBump ← throttle ← 0;
    END;

  SendAck: INTERNAL PROCEDURE =
    BEGIN OPEN mine;
    b: PupBuffer;
    IF c # NIL THEN BEGIN b ← c; c ← NIL; END
    ELSE IF (b ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
    b.pupBody ← ack[
      dataBytesPerPup, MAX[0, INTEGER[myMaxAllocate - inputQueue.length]],
      byteAllocate];
    allocatedID ← nextInputID + byteAllocate;
    b.pupType ← ack;
    b.pupID ← Flop[nextInputID];
    b.pupLength ← bytesPerAck;
    b.source ← local;
    b.dest ← remote;
    PupRouterSendThis[b];
    IF CommFlags.doStats THEN StatIncr[statAcksSent];
    IF state = open AND outEnd # 0 AND allocatedPups # 0 THEN
      timer ← System.GetClockPulses[]; -- avoid pinging
    sendAck ← FALSE;
    END;

  Get: ENTRY PROCEDURE RETURNS [b: PupBuffer] =
    BEGIN OPEN mine;
    ENABLE UNWIND => NULL;
    IF inputQueue.length = 0 THEN
      SELECT TRUE FROM
	(state = closed) => ERROR PupStream.StreamClosing[whyClosed, text];
	dontWait => RETURN[NIL];
	ENDCASE => WAIT inputReady;
    IF inputQueue.length # 0 THEN b ← DequeuePup[@inputQueue] ELSE b ← NIL;
    IF b = NIL THEN
      BEGIN
      IF state = closed THEN ERROR PupStream.StreamClosing[whyClosed, text];
      RETURN;
      END;
    IF CommFlags.doStats THEN
      SELECT b.pupType FROM
	data, aData =>
	  BEGIN
	  StatIncr[statDataPacketsReceived];
	  StatBump[statDataBytesReceived, b.pupLength - bytesPerPupHeader];
	  END;
	mark, aMark => StatIncr[statMarksReceived];
	ENDCASE => Glitch[NeitherDataNorMark];
    IF inputQueue.length = 0 AND sendAck THEN SendAck[];
    END;

  Put: PROCEDURE [b: PupBuffer] =
    BEGIN OPEN mine;
    IF CommFlags.doDebug AND b.requeueProcedure # BufferDefs.ReturnFreeBuffer THEN
      Glitch[BufferAlreadyRequeued];
    SELECT state FROM
      open, talking =>
	BEGIN
	IF CommFlags.doStats THEN
	  BEGIN
	  StatIncr[statDataPacketsSent];
	  StatBump[statDataBytesSent, b.pupLength - bytesPerPupHeader];
	  END;
	SendPacket[b];
	END;
      idle, halfOpen => Glitch[StreamNotOpen];
      ENDCASE --end, closed, finishing-- => StreamDied[@mine, b];
    END;

  PutMark: PROCEDURE [byte: Byte] =
    BEGIN OPEN mine;
    b: PupBuffer;
    b ← GetFreePupBuffer[];
    b.pupBytes[0] ← byte;
    b.pupLength ← bytesPerPupHeader + 1;
    b.pupType ← aMark;
    SendPacket[b];
    IF CommFlags.doStats THEN StatIncr[statMarksSent];
    END;

  SendPacket: ENTRY PROCEDURE [b: PupBuffer] =
    BEGIN OPEN mine;
    SELECT state FROM
      open, talking =>
	BEGIN
	b.pupID ← Flop[nextOutputID];
	IF b.pupLength = bytesPerPupHeader THEN probeCounter ← 1
	ELSE
	  BEGIN
	  b.requeueProcedure ← LOOPHOLE[PutOnSentQueue];
	  state ← talking;
	  unackedPups ← unackedPups + 1;
	  END;
	b.source ← local;
	b.dest ← remote;
	nextOutputID ← nextOutputID + (b.pupLength - bytesPerPupHeader);
	IF b.pupType = data -- we don't use mark, only aMark
	   AND ~((maxOutputID - nextOutputID) > 0 AND allocatedPups > unackedPups)
	  THEN b.pupType ← aData;
	IF b.pupType # data THEN -- aMark or aData
	  BEGIN timer ← System.GetClockPulses[]; aDataOut ← TRUE; END;
	PupRouterSendThis[b];
	IF b.pupType # data THEN WaitToSend[ ! UNWIND => NULL];
	END;
      idle, halfOpen => Glitch[StreamNotOpen];
      ENDCASE --end, closed, finishing-- => StreamDied[@mine, b ! UNWIND => NULL];
    END;

  WaitToSend: INTERNAL PROCEDURE =
    BEGIN OPEN mine;
    SELECT state FROM
      open, talking =>
	BEGIN
	-- Wait until all our packets have been acked so we don't shoot them down.
	DO
	  SELECT state FROM
	    open => EXIT;
	    talking => WAIT stateChange;
	    ENDCASE => StreamDied[@mine, NIL];
	  ENDLOOP;
	-- now wait for allocate
	UNTIL (maxOutputID - nextOutputID) > 0 AND allocatedPups > unackedPups DO
	  SELECT state FROM
	    -- not really, but .....

	    open, talking => WAIT stateChange;
	    ENDCASE => StreamDied[@mine, NIL];
	  ENDLOOP;
	IF throttle > 0 THEN ThrottleBack[];
	END;
      idle, halfOpen => Glitch[StreamNotOpen];
      ENDCASE --end, closed, finishing-- => StreamDied[@mine, NIL];
    END;

  PutOnSentQueue: ENTRY PROCEDURE [b: PupBuffer] =
    BEGIN OPEN mine; -- better not SIGNAL from here
    IF ackedID > Flip[b.pupID] THEN
      BEGIN
      IF (unackedPups ← unackedPups - 1) = 0 THEN
	BEGIN
	SELECT state FROM -- last packet has been ACKed

	  talking => state ← open;
	  finishing => state ← end;
	  ENDCASE => SendAbort[@mine];
	BROADCAST stateChange;
	END;
      ReturnFreePupBuffer[b];
      END
    ELSE IF sentBuffer = NIL THEN sentBuffer ← b ELSE EnqueuePup[@sentQueue, b];
    END;

  Slurp: PROCEDURE =
    BEGIN OPEN mine;
    b: PupBuffer;
    UNTIL pleaseDie DO
      b ← socket.get[];
      IF b # NIL THEN InputPacket[b];
      ENDLOOP;
    END;

  InputPacket: ENTRY PROCEDURE [b: PupBuffer] =
    BEGIN OPEN mine;
    thisID: LONG INTEGER ← Flip[b.pupID];
    c ← b;

    IF ~(b.pupType = rfc OR b.pupType = error) AND b.source.socket # remote.socket
      THEN
      BEGIN
      IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadSource];
      ReturnFreePupBuffer[b];
      c ← NIL;
      END;

      SELECT b.pupType FROM
      data, aData, mark, aMark =>
	SELECT state FROM
	  open, talking, end =>
	    BEGIN
	    SELECT b.pupType FROM aData, aMark => sendAck ← TRUE; ENDCASE;
	    IF b.pupLength > bytesPerPupHeader THEN
	      BEGIN
	      offset: INTEGER ← Diff[thisID, nextInputID];
	      SELECT offset FROM
		0 =>
		  BEGIN -- nice - just what we wanted
		  nextInputID ← nextInputID + (c.pupLength - bytesPerPupHeader);
		  EnqueuePup[@inputQueue, c];
		  NOTIFY inputReady;
		  c ← NIL;
		  END;
		ENDCASE =>
		  BEGIN
		  IF CommFlags.doStats THEN
		    SELECT offset FROM
		      IN (0..duplicateWindow] =>
			StatIncr[statDataPacketsReceivedEarly];
		      IN (-duplicateWindow..0) =>
			StatIncr[statDataPacketsReceivedAgain];
		      ENDCASE => StatIncr[statDataPacketsReceivedVeryLate];
		  ReturnFreePupBuffer[c];
		  c ← NIL;
		  END;
	      END
	    ELSE -- funny length from way back there
	      BEGIN
	      IF CommFlags.doStats THEN
		IF b.pupLength = bytesPerPupHeader AND b.pupType = aData THEN
		  BEGIN StatIncr[statProbesReceived]; SendAck[]; END
		ELSE StatIncr[statEmptyFunnys];
	      END;
	    -- answer probes immediately
	    IF sendAck AND inputQueue.length = 0 THEN SendAck[];
	    END;
	  halfOpen => SendRfc[@mine];
	  ENDCASE --idle, closed, finishing-- => SendAbort[@mine];

      ack =>
	BEGIN
	IF b.pupLength < bytesPerAck THEN
	  BEGIN
	  IF CommFlags.doStats THEN StatIncr[statMouseTrap];
	  GOTO SkipThisAck;
	  END;
	IF CommFlags.doStats THEN StatIncr[statAcksReceived];
	-- Try to avoid the funny stable SLOW case
	IF (thisID - ackedID) < 0 OR (thisID = ackedID AND probeCounter = 0) THEN
	  BEGIN
	  IF CommFlags.doStats THEN StatIncr[statDuplicateAcks];
	  GOTO SkipThisAck;
	  END;
	probeCounter ← 0;
	IF aDataOut THEN
	  BEGIN
	  myRetrTime, responseTime: System.Pulses;
	  responseTime ← [System.GetClockPulses[] - timer];
	  responseTime ← [MIN[responseTime, maxRetransmitPulses]];
	  -- retransmitPulses ← 2*Smooth[responseTime]
	  -- retransmitPulses ← 2*((7/8)*(retransmitPulses/2)+(1/8)*responseTime)
	  -- retransmitPulses ← (7*retransmitPulses+2*responseTime)/8
	  myRetrTime ← retransmitPulses;
	  myRetrTime ← [(6*myRetrTime + myRetrTime + 2*responseTime)/8];
	  myRetrTime ←
	    [MAX[minRetransmitPulses, MIN[myRetrTime, maxRetransmitPulses]]];
	  retransmitPulses ← myRetrTime;
	  aDataOut ← FALSE;
	  clumpsSinceBump ← clumpsSinceBump + 1;
	  IF clumpsSinceBump > clumpsBeforeBump THEN ThrottleForward[];
	  END;
	IF allocatedPups = 0 THEN BROADCAST stateChange;
	hisMaxAllocate ← b.numberOfPupsAhead;
	IF hisMaxAllocate = 0 THEN
	  BEGIN
	  probeCounter ← 1;
	  IF CommFlags.doStats THEN StatIncr[statEmptyAlloc];
	  END;
	allocatedPups ← MIN[hisMaxAllocate, pathMaxAllocate];
	IF outEnd = 0 THEN BROADCAST stateChange; -- in case first time
	outEnd ← MIN[b.maximumBytesPerPup, dataBytesPerPup];
	IF ~sameNet THEN outEnd ← MIN[outEnd, PupTypes.maxDataBytesPerGatewayPup];
	IF (thisID - maxOutputID) + b.numberOfBytesAhead > 0 THEN
	  maxOutputID ← b.numberOfBytesAhead + thisID;
	ackedID ← thisID;
	IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[@sentQueue];
	WHILE sentBuffer # NIL AND thisID > Flip[sentBuffer.pupID] DO
	  IF (unackedPups ← unackedPups - 1) = 0 THEN
	    BEGIN
	    SELECT state FROM -- last packet has been ACKed
	      talking => state ← open;
	      finishing => state ← end;
	      ENDCASE => SendAbort[@mine];
	    BROADCAST stateChange;
	    END;
	  ReturnFreePupBuffer[sentBuffer];
	  sentBuffer ← DequeuePup[@sentQueue];
	  ENDLOOP;
	UNTIL sentBuffer = NIL DO
	  PupRouterSendThis[sentBuffer];
	  IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted];
	  sentBuffer ← DequeuePup[@sentQueue];
	  ENDLOOP;
	EXITS SkipThisAck => NULL;
	END;
      ENDCASE => GotOther[@mine, c];

    IF c # NIL THEN BEGIN ReturnFreePupBuffer[c]; c ← NIL; END;
    END;

  MyGetLocalAddress: PROCEDURE RETURNS [PupTypes.PupAddress] =
    BEGIN RETURN[GetLocalAddress[@mine]]; END;

  MyGetSenderSizeLimit: PROCEDURE RETURNS [CARDINAL] =
    BEGIN RETURN[GetSenderSizeLimit[@mine]]; END;

  MySendAttention: ENTRY PROCEDURE =
    BEGIN OPEN mine;
    ENABLE UNWIND => NULL;
    WHILE outIntPending DO WAIT stateChange; ENDLOOP;
    outIntPending ← TRUE;
    SendInt[@mine];
    END;

  MyWaitForAttention: ENTRY PROCEDURE =
    BEGIN OPEN mine;
    ENABLE UNWIND => NULL;
    WHILE seenIntSeq = inIntSeq DO WAIT waitingForInterrupt; ENDLOOP;
    seenIntSeq ← seenIntSeq + 1;
    END;

  -- initialization
  mine.me.get ← Get; -- do cold stuff here to avoid recompilation hassels
  mine.me.put ← Put;
  mine.me.putMark ← PutMark;
  mine.me.getSenderSizeLimit ← MyGetSenderSizeLimit;
  mine.me.sendAttention ← MySendAttention;
  mine.me.waitForAttention ← MyWaitForAttention;
  mine.me.getLocalAddress ← MyGetLocalAddress;
  mine.slurp ← Slurp;
  mine.retransmitter ← Retransmitter;
  RETURN[@mine];
  END.