-- PacketStreamInstance.mesa (last edited by: BLyon on: March 18, 1981 6:18 PM)
-- Function: The implementation module for an instance of a SPP packet stream.

DIRECTORY
BufferDefs USING [
BufferAccessHandle, OisBuffer, QueueObject, QueueLength, QueueCleanup,
QueueInitialize],
ByteBlt USING [ByteBlt],
CommFlags USING [doDebug, doStats],
CommUtilDefs USING [EnableAborts],
DriverDefs USING [Glitch],
Environment USING [Block],
Inline USING [LowHalf],
OISCP USING [
DequeueSpp, EnqueueSpp, MaybeGetFreeSendOisBufferFromPool,
MaybeGetFreeReceiveOisBufferFromPool, ReturnFreeOisBuffer, unknownConnID],
OISCPTypes USING [ConnectionID, maxBytesPerPkt, WaitTime],
PacketStream: FROM "PacketStream",
Process USING [MsecToTicks, SetTimeout, Ticks, Yield],
Router USING [XmitStatus],
Runtime USING [SelfDestruct],
Socket USING [
Abort, ChannelAborted, ChannelHandle, Create, Delete, GetPacket, GetStatus,
PutPacket, TimeOut],
SocketInternal USING [GetBufferPool, ChannelHandleToSocketHandle],
SpecialSystem USING [NetworkAddress],
StatsDefs USING [StatBump, StatIncr],
System USING [
GetClockPulses, MicrosecondsToPulses, Pulses, PulsesToMicroseconds];

-- Time units have been converted from MilliSeconds to Pulses; However, all interface
-- still are in units of MilliSeconds (like the timeout passed to the MONITOR).
-- GetClock now returns Pulses. Variable names ending in ’Time’ will be in milliseconds
-- (hopefully only initial values), while other variables will be in Pulses (some will end in
-- ’Pulses’).

PacketStreamInstance: MONITOR [
localAddr, remoteAddr: SpecialSystem.NetworkAddress,
localConnectionID, remoteConnectionID: OISCPTypes.ConnectionID,
establish: BOOLEAN, timeout: OISCPTypes.WaitTime ← PacketStream.defaultWaitTime,
classOfService: PacketStream.ClassOfService ← bulk]
RETURNS [
PacketStream.Handle, SpecialSystem.NetworkAddress, OISCPTypes.ConnectionID]

IMPORTS
BufferDefs, ByteBlt, CommUtilDefs, DriverDefs, Inline, Process,
Runtime, Socket, SocketInternal, StatsDefs, System, PacketStream, OISCP
EXPORTS System
SHARES BufferDefs =
BEGIN OPEN DriverDefs, OISCP, Router, PacketStream, StatsDefs;

-- EXPORTED TYPE(S)
NetworkAddress: PUBLIC TYPE = SpecialSystem.NetworkAddress;

-- client interface
ps: Object ←
[NIL, TakeFromUser, GetForUser, WaitForAttention, SetWaitTime, FindAddresses,
GetSenderSizeLimit, ReturnGetSppDataBuffer]; -- pool (NIL) initted later

-- connection control parameters
-- connection state
state: PacketStream.State;
whySuspended: PacketStream.SuspendReason;
whyFailed: PacketStream.FailureReason;
stateBeforeSuspension: PacketStream.State;
connectionIsEstablished: CONDITION;
-- socket interface data
socketCH: Socket.ChannelHandle;
pool: BufferDefs.BufferAccessHandle;
-- this is the buffer pool from which we get buffers
-- sequencing, duplicate suppression and flow control information
nextInputSeq, maxInputSeq: CARDINAL;
unackedOutputSeq, nextOutputSeq, maxOutputSeq: CARDINAL;
newAllocation: CONDITION;
sendAnAck: BOOLEAN;
-- connection control
maxOisPktLength: CARDINAL;
-- input attention control
newInputAttn: CONDITION;
-- table to suppress attentions that the client has seen, but we haven’t acked.
-- we permit only a small number of pending attns, the rest are thrown away!
attnSeqTable: ARRAY (0..maxPendingAttns] OF CARDINAL;
attnCount: CARDINAL;
sentBuffer: BufferDefs.OisBuffer; -- hold the head of the sentQueue
-- input/output pointers and queues
inOrderQueue, inAttnQueue, outOfOrderQueue, tempQueue, sentQueue:
BufferDefs.QueueObject;
inOrderQueueNotEmpty: CONDITION;
-- variables and parameters for this packet stream
ackRequestPulse: LONG CARDINAL; -- the time an ack was requested, a Pulse
lastPacketReceivedPulse: LONG CARDINAL;
-- time any packet was rcv’d for this connection, a Pulse
lastProbePulse: LONG CARDINAL; -- retransmitter’s last probe sent time, a Pulse
probeCounter: CARDINAL; -- retransmitter’s allocation probe counter
probeRetransmitPulses: LONG CARDINAL; -- in Pulses
dataPacketRetransmitPulses: System.Pulses; -- in Pulses
waitPulses: LONG CARDINAL;
-- maximum time (in Pulses) any blocking procedure should block
delayCount: CARDINAL; -- number of data packets used in delay calculation
delaySum: System.Pulses;
-- cumulative time (in Pulses) that delayCount packets spent on retransmit queue
lastDelayCalculationPulse: LONG CARDINAL;
-- a Pulses, last time we updated retransmit time
seqNumWhenDelayCalculated: CARDINAL;
-- process handles and specific parameters for controlling them
pleaseStop: BOOLEAN;
letClientRun: CONDITION; -- this is also used in DeleteCleanup - do not ENABLE ABORTs
retransmitterFork, receiverFork: PROCESS;
retransmitterWakeupTimer: CONDITION;
-- temporary stats
normalRetransUpdates: CARDINAL; -- **temp
doubleRetransUpdates: CARDINAL; -- **temp

-- constants for bulk transfer performance
-- 10 packet buffers at the socket.
-- 3 are for sending, 3 for receiving.
-- 1 for sending a system packet and 1 for receiving a system packet
-- 1 for receiving an attention packet
-- 1 for extra receive packet
bulkSendBufs: CARDINAL = 4;
bulkReceiveBufs: CARDINAL = 6;
bulkInitialAllocation: CARDINAL = 3;

-- constants for transactional transfer performance
-- 7 packet buffers at the socket.
-- 2 are for sending, 2 for receiving.
-- 1 for sending a system packet and 1 for receiving a system packet
-- 1 for extra receive packet for attention or whatever
transactSendBufs: CARDINAL = 3;
transactReceiveBufs: CARDINAL = 4;
transactInitialAllocation: CARDINAL = 2;

defaultSendAllocation: CARDINAL = 1; -- more will screw simple guys like 860
duplicateWindow: INTEGER = 100;
maxPendingAttns: CARDINAL = 5;
-- some day these will become variables whose value will be determined heuristically
-- using some kind of adaptive algorithm
retransmissionsBeforeGiveUp: CARDINAL = 30;
retransmissionsBeforeAskForAck: CARDINAL = 2;
probesBeforeGiveUp: CARDINAL = 8; -- therefore 40 secs or inactivity
emptyInOrderQueue: INTEGER = 0;
minSendAllocationBeforeRequestingAck: INTEGER = 0;
minRcvAllocation: INTEGER = 0;
initialDataPacketRetransmitTime: CARDINAL = 2000; -- msecs
maxDataPacketRetransmitTime: CARDINAL = 20000; -- msecs
maxDataPacketRetransmitPulses: System.Pulses ← MilliSecondsToPulses[
maxDataPacketRetransmitTime];
initialRetransmitterTime: CARDINAL = 250; -- msecs
probeMultiplier: CARDINAL = 4; -- probe time is this multiple of retrans timeout
delayCalculationTime: CARDINAL = 10000;
-- 10 secs (keep < 1 min to assure that delaySum doesn’t overflow)
delayCalculationPulses: LONG CARDINAL ← MilliSecondsToPulses[
delayCalculationTime];
inactiveConnectionTime: CARDINAL = 5000; -- msecs
inactiveConnectionPulses: LONG CARDINAL ← MilliSecondsToPulses[
inactiveConnectionTime];
fortyMsecsOfPulses: LONG CARDINAL ← MilliSecondsToPulses[40];
fiveHundredMsecsOfPulses: LONG CARDINAL ← MilliSecondsToPulses[500];
-- the following four constants are weights used in UpdateRetransmitTime
wOldAbove: CARDINAL ← 1;
wOldBelow: CARDINAL ← 2;
wNewAbove: CARDINAL ← 50;
wNewBelow: CARDINAL ← 2;


-- We are using an adaptive scheme for deciding when packets must be retransmitted.
-- We assume that dataPacketRetransmitPulses must elapse since the last time
-- ackRequestPulse was updated before retransmitting everything on the sentQueue.
-- ackRequestPulse is updated whenever a data packet with the sendAck bit set is
-- transmitted or retransmitted, or when a packet is received that updates the
-- unackedOutputSeq field. We transmit
-- a probe whenever no packet has been received for 5 sec. The connection
-- can become suspended if we send 8 probes without a response (probes being
-- generated because of allocation requirements or the 5 sec threshold), or if we
-- retransmit a packet too many times. This stuff is very tricky and need careful
-- thought for the next Pilot.


-- various Glitches generated by this module
StreamNotEstablished: ERROR = CODE;
StreamTerminating: ERROR = CODE;

-- Hot Procedures

-- This procedure gives a Sequenced Packet Protocol packet to be transmitted over
-- the specified packet stream. This packet resides in a buffer, and some of the fields
-- of the packet must be filled in by the caller of this procedure. These fields are
-- oisPktLength, the attention flag, and subtype. This buffer will be returned to the
-- free queue once the packet has been acknowledged. SIGNALs generated by this
-- procedure leave the clean up responsibility for the buffer to the caller.

TakeFromUser: PROCEDURE [b: BufferDefs.OisBuffer] =
BEGIN

TakeFromUserLocked: ENTRY PROCEDURE = INLINE
BEGIN
ENABLE UNWIND => NULL;
DO
SELECT state FROM
unestablished, activeEstablish, waitEstablish =>
IF CommFlags.doDebug THEN Glitch[StreamNotEstablished];
established, open =>
IF LOOPHOLE[nextOutputSeq - maxOutputSeq, INTEGER] <= 0 THEN
BEGIN
b.ois.systemPacket ← FALSE;
-- we should ask for an acknowledgement to be returned, if on sending this
-- packet there is space only for minAllocationBeforeRequestingAck, or this is
-- an attention. We should do this only if, our client hasn’t already set the bit.
IF NOT b.ois.sendAck THEN
b.ois.sendAck ← b.ois.attention OR LOOPHOLE[maxOutputSeq - nextOutputSeq,
INTEGER] = minSendAllocationBeforeRequestingAck;
b.ois.unusedType ← 0;
PrepareSequencedPacket[b];
END
ELSE
BEGIN
WAIT newAllocation; -- let ABORTED ERROR propogate
LOOP; -- we start from the beginning in case the state changed
END;
suspended => RETURN WITH ERROR ConnectionSuspended[whySuspended];
terminating => IF CommFlags.doDebug THEN Glitch[StreamTerminating];
ENDCASE;
EXIT;
ENDLOOP;
END; -- TakeFromUserLocked

TakeFromUserLocked[]; -- either returns or generates a SIGNAL.
IF CommFlags.doStats THEN StatIncr[statDataPacketsSent];
IF CommFlags.doStats THEN
StatBump[statDataBytesSent, b.ois.pktLength - bytesPerSequencedPktHeader];
PutPacketOnSocketChannel[b];
END; -- TakeFromUser

-- This procedure gets the next sequenced, duplicate-suppressed packet from the
-- packet stream. This procedure hangs till a packet is available. The client is
-- expected to return the buffer to the free queue. If the wait times expires then
-- NIL will be returned, and so the client MUST test for this case.

GetForUser: PROCEDURE RETURNS [b: BufferDefs.OisBuffer] =
BEGIN
returnAnAck: BOOLEAN ← FALSE;

GetForUserLocked: ENTRY PROCEDURE = INLINE
BEGIN
ENABLE UNWIND => NULL;
startPulse, now: LONG CARDINAL;
startPulse ← System.GetClockPulses[];
DO
SELECT state FROM
unestablished, activeEstablish, waitEstablish =>
IF CommFlags.doDebug THEN Glitch[StreamNotEstablished];
established, open =>
BEGIN
IF (b ← DequeueSpp[@inOrderQueue]) # NIL THEN
BEGIN
-- removed code to return an ack here
-- IF sendAnAck AND BufferDefs.QueueLength[@inOrderQueue] =
-- emptyInOrderQueue THEN
-- BEGIN
-- sendAnAck ← FALSE; set the global switch to false
-- returnAnAck ← TRUE; we must send back the ack

-- END;
IF CommFlags.doStats THEN
BEGIN
StatIncr[statDataPacketsReceived];
StatBump[
statDataBytesReceived,
b.ois.pktLength - bytesPerSequencedPktHeader];
END;
END
ELSE
BEGIN
now ← System.GetClockPulses[];
IF (now - startPulse) >= waitPulses THEN EXIT;
WAIT inOrderQueueNotEmpty; -- let ABORTED ERROR propogate
LOOP; -- we start from the beginning in case the state changed
END;
END;
suspended => RETURN WITH ERROR ConnectionSuspended[whySuspended];
terminating => IF CommFlags.doDebug THEN Glitch[StreamTerminating];
ENDCASE;
EXIT;
ENDLOOP;
END; -- GetForUserLocked

GetForUserLocked[];
-- we now ack when we receive the packet IF returnAnAck THEN SendSystemPacket[FALSE];
END; -- GetForUser

-- This procedure waits until a packet with the attention bit arrives, and returns
-- an advance copy to the client, who must return it to the buffer pool. The client will
-- not get duplicates, though it may see the advance copies of the attention
-- packet after it has seen the real copy if it is sluggish.

WaitForAttention: ENTRY PROCEDURE RETURNS [b: BufferDefs.OisBuffer] =
BEGIN
ENABLE UNWIND => NULL;
DO
SELECT state FROM
unestablished, activeEstablish, waitEstablish =>
IF CommFlags.doDebug THEN Glitch[StreamNotEstablished];
established, open =>
BEGIN
IF (b ← DequeueSpp[@inAttnQueue]) # NIL THEN EXIT
ELSE
BEGIN
WAIT newInputAttn; -- let ABORTED ERROR propogate
LOOP; -- we start from the beginning in case the state changed
END;
END;
suspended => RETURN WITH ERROR ConnectionSuspended[whySuspended];
terminating => IF CommFlags.doDebug THEN Glitch[StreamTerminating];
ENDCASE;
EXIT;
ENDLOOP;
END; -- WaitForAttention

-- This procedure returns a processed spp data buffer to its packet stream socket’s pool. It also increases the allocation window

ReturnGetSppDataBuffer: PROCEDURE [b: BufferDefs.OisBuffer] =
BEGIN

JustOpenedTheWindow: ENTRY PROCEDURE RETURNS [BOOLEAN] = INLINE
{RETURN[LOOPHOLE[(maxInputSeq ← maxInputSeq + 1) - nextInputSeq, INTEGER] = 1]};

OISCP.ReturnFreeOisBuffer[b];
IF JustOpenedTheWindow[] THEN SendSystemPacket[FALSE]; -- just opened a closed window
END;

-- This procedure causes the transmission of a system packet, i.e. one that does not
-- consume any sequence number. The subtype of this packet is irrelavent and so is
-- made zero. The procedure takes an argument indicating whether the send
-- acknowledgement flag should be set or not in the outgoing packet.

SendSystemPacket: PROCEDURE [sendAck: BOOLEAN] =
BEGIN
b: BufferDefs.OisBuffer;

SendSystemPacketLocked: ENTRY PROCEDURE = INLINE
BEGIN PrepareSequencedPacket[b]; END; -- SendSystemPacketLocked

IF (b ← OISCP.MaybeGetFreeSendOisBufferFromPool[pool]) # NIL THEN
BEGIN
-- b.requeueProcedure should ony be set and inspected by level1 and network drivers,
b.ois.pktLength ← bytesPerSequencedPktHeader;
b.ois.systemPacket ← TRUE;
b.ois.sendAck ← sendAck;
b.ois.attention ← FALSE;
b.ois.unusedType ← 0;
b.ois.subtype ← 0;
SendSystemPacketLocked[];
PutPacketOnSocketChannel[b];
END;
END; -- SendSystemPacket

-- This procedure takes a buffer and prepares it for transmission by filling in the
-- appropriate fields in the buffer and updating the packet stream state variables.

PrepareSequencedPacket: INTERNAL PROCEDURE [b: BufferDefs.OisBuffer] = INLINE
BEGIN
-- b.requeueProcedure should ony be set and inspected by level1 and network drivers
-- b.length filled by router
-- b.network filled by router
-- b.iocbChain filled by network driver
-- b.userPtr not used currently
-- b.userData not used currently
-- b.userDataLength not used currently
-- b.status used by the socket and router code
-- b.time used to determine round-trip delay. Perhaps someday individual retransmit timeout. Filled when put on retransmit queue
b.tries ← 0;
-- rest used by queue package, drivers and dispatcher
-- encapsulation filled by router
-- b.checksum filled by router
-- b.ois.pktLength filled by caller of this routine
-- b.transportControl filled by router
b.ois.transCntlAndPktTp.packetType ← sequencedPacket;
b.ois.destination ← remoteAddr;
-- b.ois.source filled by socket interface
-- b.ois.systemPacket filled by caller of this routine
-- b.ois.sendAck filled by acknowledgement strategy by caller
-- b.ois.attention filled by caller of this routine
-- b.ois.unusedType filled by caller of this routine
-- b.ois.subtype filled by caller of this routine
-- b.ois.endOfMessage filled by caller of this routine
b.ois.sourceConnectionID ← localConnectionID;
b.ois.destinationConnectionID ← remoteConnectionID;
b.ois.sequenceNumber ← nextOutputSeq;
b.ois.acknowledgeNumber ← nextInputSeq;
b.ois.allocationNumber ← maxInputSeq;
IF NOT b.ois.systemPacket THEN nextOutputSeq ← nextOutputSeq + 1
ELSE
BEGIN
IF CommFlags.doStats THEN StatIncr[statAcksSent];
IF CommFlags.doStats THEN StatIncr[statSystemPacketsSent];
END;
IF b.ois.sendAck AND CommFlags.doStats THEN StatIncr[statAckRequestsSent];
IF b.ois.attention AND CommFlags.doStats THEN StatIncr[statAttentionsSent];
END; -- PrepareSequencedPacket

-- This procedure puts a packet out on the socket channel, where the packet is
-- asynchronously sent. On completion the dispatcher enqueues the packet onto
-- the sentQueue for possible retransmission only if the packet is not a system packet.
-- The packet stream monitor should not be locked when we call this.

PutPacketOnSocketChannel: PROCEDURE [b: BufferDefs.OisBuffer] =
BEGIN
IF ~b.ois.systemPacket THEN
BEGIN
b.requeueProcedure ← LOOPHOLE[EnqueueTransmittedPacketAppropriately];
-- remember time for retransmitter
IF b.ois.sendAck THEN ackRequestPulse ← System.GetClockPulses[];
END;
Socket.PutPacket[
socketCH, b !
Socket.ChannelAborted =>
-- we are about to be deleted
BEGIN b.requeueProcedure[b]; CONTINUE; END];
END; -- PutPacketOnSocketChannel

EnqueueTransmittedPacketAppropriately: ENTRY PROCEDURE [b: BufferDefs.OisBuffer] =
BEGIN

FailConnection: INTERNAL PROCEDURE [why: FailureReason] =
BEGIN
SuspendStream[noRouteToDestination];
whyFailed ← why; -- inform the creator if there is one

END; -- FailConnection

-- check for failures (this may be the first packet, so look for connection failures)
SELECT LOOPHOLE[b.status, XmitStatus] FROM
pending, goodCompletion, aborted, invalidDestAddr => NULL;
-- ok or not expected

noRouteToNetwork, hardwareProblem => FailConnection[noRouteToDestination];
noTranslationForDestination => FailConnection[noTranslationForDestination];
noAnswerOrBusy => FailConnection[noAnswerOrBusy];
circuitInUse => FailConnection[circuitInUse];
circuitNotReady => FailConnection[circuitNotReady];
noDialingHardware => FailConnection[noDialingHardware];
dialerHardwareProblem => FailConnection[noRouteToDestination];
ENDCASE => ERROR;
-- enqueue onto sentQueue since this is not a system packet
EnqueueSpp[@sentQueue, b];
IF b.tries = 0 AND b.ois.sendAck THEN b.time ← System.GetClockPulses[];
-- record time if ackReq packet and first time put there (used for delay calculation)

END; -- EnqueueTransmittedPacketAppropriately

-- This procedure changes the state of the packet stream to be suspended.

SuspendStream: INTERNAL PROCEDURE [why: SuspendReason] =
BEGIN
stateBeforeSuspension ← state;
state ← suspended;
whySuspended ← why;
END; -- SuspendStream

-- This procedure changes the state of the packet stream to be suspended.

SuspendStreamLocked: ENTRY PROCEDURE [why: SuspendReason] =
BEGIN SuspendStream[why]; END; -- SuspendStreamLocked

-- This process retransmits packets that have not been acknowledged in a reasonable
-- time, and in addition generates probes etc. to test for the connection’s livenesss.
-- The Retransmitter, Receiver and the transmission process all three try to keep the
-- sentBuffer and sentQueue ordered by sequence number.

Retransmitter: PROCEDURE =
BEGIN
now: LONG CARDINAL;
retransBuffer: BufferDefs.OisBuffer;

-- some procedures whose scope is just the Retransmitter

TimeToSendAProbe: ENTRY PROCEDURE RETURNS [goAhead: BOOLEAN] = INLINE
BEGIN
goAhead ← FALSE;
IF state = established OR state = open THEN
BEGIN
IF
((LOOPHOLE[nextOutputSeq - maxOutputSeq, INTEGER] > 0 AND
(now - lastProbePulse) > probeRetransmitPulses)) OR
(((now - lastPacketReceivedPulse) >= inactiveConnectionPulses AND
(now - lastProbePulse) >= inactiveConnectionPulses)) THEN
BEGIN
-- probe for allocation, or just an ack from other end, or for activity.
lastProbePulse ← now;
IF (probeCounter ← probeCounter + 1) > probesBeforeGiveUp THEN
SuspendStream[transmissionTimeout]
ELSE BEGIN goAhead ← TRUE; IF CommFlags.doStats THEN StatIncr[statProbesSent]; END;
END;
END;
END; -- TimeToSendAProbe

RetransmissionCount: ENTRY PROCEDURE RETURNS [CARDINAL] = INLINE
BEGIN
IF state = established OR state = open THEN
BEGIN
IF (now - ackRequestPulse) >= dataPacketRetransmitPulses THEN
RETURN[sentQueue.length + (IF sentBuffer = NIL THEN 0 ELSE 1)]
-- we must retransmit all packets currently on the sentQueue and sentBuffer.
-- a count is kept because on retransmission we put packets back on
-- sentQueue and otherwise we will be in an infinite loop.

ELSE RETURN[0]; -- not yet time to retransmit

END
ELSE RETURN[0]; -- any other state requires no retransmission.

END; -- RetransmissionCount

GetFromRetransmissionQueue: ENTRY PROCEDURE RETURNS [b: BufferDefs.OisBuffer] = INLINE
BEGIN
IF sentBuffer = NIL THEN b ← DequeueSpp[@sentQueue]
ELSE BEGIN b ← sentBuffer; sentBuffer ← NIL; END;
IF b = NIL THEN RETURN;
SELECT b.tries FROM
>= retransmissionsBeforeGiveUp =>
BEGIN -- give up trying to send anything on this packet stream.
SuspendStream[transmissionTimeout];
sentBuffer ← b; -- put back at head of sentQueue
b ← NIL;
END;
= retransmissionsBeforeAskForAck => b.ois.sendAck ← TRUE;
-- need to set ack request

ENDCASE;
END; -- GetFromRetransmissionQueue

WaitForAWhile: ENTRY PROCEDURE = INLINE
BEGIN WAIT retransmitterWakeupTimer; END; -- WaitForAWhile

UpdateRetransmitTime: PROCEDURE = INLINE
BEGIN
avgDelay: System.Pulses;
weightNewAbove: CARDINAL;
IF ((now - lastDelayCalculationPulse) >= delayCalculationPulses) THEN
BEGIN -- time to do retransmission timeout determination
IF delayCount > 0 THEN
BEGIN
-- we have had some packets with lossless delay (no retransmissions)
avgDelay ← [delaySum/delayCount];
normalRetransUpdates ← normalRetransUpdates + 1; -- **temp
-- new retransmit timeout is a function of the old retransmit timeout and the
--newly measured delay. If the delay is less than about 500 msecs, the standard
--deviation of the measured delay is large compared to transmission time;
--otherwise transmission time dominates. So for fast mediums, we multiply the
--delay by a large factor. For slow mediums, we go with the measured delay.
-- The following lines are in flux.
weightNewAbove ← IF avgDelay > fiveHundredMsecsOfPulses THEN 1 ELSE wNewAbove;
dataPacketRetransmitPulses ←
[(dataPacketRetransmitPulses / wOldBelow)* wOldAbove +
(avgDelay / wNewBelow)* weightNewAbove];
-- dataPacketRetransmitPulses ←
--[(dataPacketRetransmitPulses + wNewAbove*avgDelay)/2];
delaySum ← [0];
delayCount ← 0;
END
ELSE
BEGIN
-- if data flowed, all had retranmissions and we should increase timeout
IF unackedOutputSeq > seqNumWhenDelayCalculated THEN
BEGIN
doubleRetransUpdates ← doubleRetransUpdates + 1; -- **temp
dataPacketRetransmitPulses ← [dataPacketRetransmitPulses*2];
END;
END;
seqNumWhenDelayCalculated ← unackedOutputSeq;
dataPacketRetransmitPulses ← [MIN[
dataPacketRetransmitPulses, maxDataPacketRetransmitPulses]];
probeRetransmitPulses ←
probeMultiplier*dataPacketRetransmitPulses + fortyMsecsOfPulses;
Process.SetTimeout[
@retransmitterWakeupTimer, PulsesToTicks[[dataPacketRetransmitPulses/4]]];
-- set new retransmitter wakeup

END;
END; -- UpdateRetransmitTime

-- main body of the procedure
UNTIL pleaseStop DO
-- checks the value of pleaseStop, race condition OK
now ← System.GetClockPulses[];
THROUGH (0..RetransmissionCount[]] DO
-- this is an upper bound
-- state may change for any reason, but we keep sending them out, unless we
-- ourselves changed the state to be suspended! The Receiver may take packets
-- off the sentQueue in parallel in which case we will get a NIL and therefore exit.
IF (retransBuffer ← GetFromRetransmissionQueue[]) = NIL THEN EXIT;
IF LOOPHOLE[retransBuffer.ois.sequenceNumber - unackedOutputSeq, INTEGER] < 0
THEN OISCP.ReturnFreeOisBuffer[retransBuffer]
-- packet has been ACKed

ELSE
BEGIN
-- its time to send this packet out again
retransBuffer.tries ← retransBuffer.tries + 1;
PutPacketOnSocketChannel[retransBuffer]; -- requeues buffer on sentQueue
retransBuffer ← NIL;
IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted];
END;
ENDLOOP;
IF TimeToSendAProbe[] THEN SendSystemPacket[TRUE];
WaitForAWhile[];
UpdateRetransmitTime[];
ENDLOOP;
END; -- Retransmitter

PulsesToTicks: PROCEDURE [pulses: System.Pulses] RETURNS [Process.Ticks] =
INLINE
BEGIN
lastCard: LONG CARDINAL = LAST[CARDINAL];
msecs: LONG CARDINAL ← System.PulsesToMicroseconds[pulses]/1000 + 1;
RETURN[
Process.MsecToTicks[
IF msecs > lastCard THEN LAST[CARDINAL] ELSE Inline.LowHalf[msecs]]];
END;

-- This process waits at the socket channel for a packet to arrive.
-- This packet is in a buffer that belongs to this socket and should be returned to the
-- pool associated with the socket channel.

Receiver: PROCEDURE =
BEGIN
b: BufferDefs.OisBuffer;
UNTIL pleaseStop DO
-- race condition not harmful as we account for it
-- now wait for something to arrive.
b ← Socket.GetPacket[
socketCH ! Socket.TimeOut => RETRY; Socket.ChannelAborted => EXIT];
-- pleaseStop got set by someone else
SELECT LOOPHOLE[b.status, XmitStatus] FROM
goodCompletion =>
SELECT b.ois.transCntlAndPktTp.packetType FROM
sequencedPacket => GotSequencedPacket[LOOPHOLE[b, BufferDefs.OisBuffer]];
error => GotErrorPacket[b];
ENDCASE =>
BEGIN
-- discard, don’t want any other protocol type
OISCP.ReturnFreeOisBuffer[b];
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadType];
END;
ENDCASE => OISCP.ReturnFreeOisBuffer[b];
ENDLOOP;
END; -- Receiver

-- This procedure examines the sequenced packet that has just arrived.

GotSequencedPacket: PROCEDURE [b: BufferDefs.OisBuffer] =
BEGIN
returnAnAck: BOOLEAN ← FALSE;
-- we do not return an ack unless we are required to
giveBackToSocket: BOOLEAN ← TRUE; -- unless we put on some other queue
nowPulse: System.Pulses; -- set if we get a packet for our connection

GotSequencedPacketLocked: ENTRY PROCEDURE = INLINE
BEGIN
SELECT state FROM
unestablished, suspended, terminating => NULL; -- not interested.

activeEstablish, waitEstablish =>
[returnAnAck, giveBackToSocket] ← EstablishThisConnection[b];
established, open =>
BEGIN
-- check that packet is from right remote address and connection ID
IF b.ois.source.host = remoteAddr.host
AND b.ois.source.socket = remoteAddr.socket AND b.ois.sourceConnectionID =
remoteConnectionID AND b.ois.destinationConnectionID = localConnectionID
THEN
BEGIN -- the packet is for this connection
probeCounter ← 0; -- other end is alive
lastPacketReceivedPulse ← nowPulse ← System.GetClockPulses[];
-- for inactivity et al
SELECT LOOPHOLE[b.ois.sequenceNumber - nextInputSeq, INTEGER] FROM
-- just the packet we wanted, or packet is early

IN [0..duplicateWindow] => RightOrEarlyPacket[];
-- old duplicate packet

IN [-duplicateWindow..0) => DuplicatePacket[];
-- very old duplicate packet

ENDCASE => IF CommFlags.doStats THEN StatIncr[statDataPacketsReceivedVeryLate]
END
ELSE
-- packet is really not for this connection (or other end not established)
BEGIN
IF CommFlags.doStats AND
(b.ois.source.host # remoteAddr.host OR b.ois.source.socket #
remoteAddr.socket) THEN StatIncr[statPacketsRejectedBadSource];
IF b.ois.sourceConnectionID # remoteConnectionID OR
b.ois.destinationConnectionID # localConnectionID THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
returnAnAck ← TRUE; -- probably our connection response got lost

END;
END;
END;
ENDCASE;
-- delayed acking has been abandoned; only allocate is dependent on client
-- this is delicate and is done to avoid getting stuck in a small window
-- IF returnAnAck AND BufferDefs.QueueLength[@inOrderQueue] # emptyInOrderQueue
-- THEN
-- BEGIN
-- returnAnAck ← FALSE; we don’t have to send back an ack
-- sendAnAck ← TRUE; make another process send back an ack

-- END;
-- This wait lets the user process run and suck up all the packets on the
-- inOrderInputQueue so that we send back a full allocate.
IF LOOPHOLE[maxInputSeq - nextInputSeq, INTEGER] < minRcvAllocation THEN
WAIT letClientRun;
END; -- GotSequencedPacketLocked

-- This procedure processes both the next expected packet as well as early packets.
-- Both kinds of packets are processed similarly as far as acks and allocation,
-- and attentions are concerned.
RightOrEarlyPacket: INTERNAL PROCEDURE = INLINE
BEGIN
IF b.ois.sendAck THEN
BEGIN
returnAnAck ← TRUE;
IF CommFlags.doStats THEN StatIncr[statAckRequestsReceived];
END;
-- update allocation and ack fields of packetstream.
IF LOOPHOLE[b.ois.allocationNumber - maxOutputSeq, INTEGER] > 0 THEN
BEGIN maxOutputSeq ← b.ois.allocationNumber; NOTIFY newAllocation; END;
IF LOOPHOLE[b.ois.acknowledgeNumber - unackedOutputSeq, INTEGER] > 0 THEN
BEGIN
unackedOutputSeq ← b.ois.acknowledgeNumber;
ackRequestPulse ← nowPulse; -- if we get an ack we move the ack req. time

END;
-- now remove acked packets, if any, from sentQueue. We assume that
-- the sentQueue is kept ordered by increasing sequence number.
IF sentBuffer = NIL THEN sentBuffer ← DequeueSpp[@sentQueue];
UNTIL sentBuffer = NIL OR
LOOPHOLE[sentBuffer.ois.sequenceNumber - unackedOutputSeq, INTEGER] >= 0 DO
IF sentBuffer.ois.sendAck AND sentBuffer.tries = 0 THEN
BEGIN -- constitutes a measure of lossless delay (no retransmissions)
delaySum ← [delaySum + (nowPulse - sentBuffer.time)];
-- add time spent on queue to delay stats
delayCount ← delayCount + 1;
END;
OISCP.ReturnFreeOisBuffer[sentBuffer];
sentBuffer ← DequeueSpp[@sentQueue];
ENDLOOP;
-- so far we have only updated state information, now to dispense with the packet
IF b.ois.systemPacket THEN
BEGIN
IF CommFlags.doStats THEN
BEGIN
StatIncr[statSystemPacketsReceived];
StatIncr[statAcksReceived];
END;
END
ELSE
-- this is not a system packet process it intelligently
BEGIN
-- if the attention bit is set then we try to process the attention, and only
-- if we were able to do so do we put the packet on the inOrderQueue.
IF ~b.ois.attention OR (b.ois.attention AND AttentionPacketProcessed[b]) THEN
BEGIN
-- only now must we decide if it is in order or out of order
IF LOOPHOLE[b.ois.sequenceNumber - nextInputSeq, INTEGER] = 0 THEN
-- just the packet we wanted
BEGIN
state ← open;
DO
nextInputSeq ← nextInputSeq + 1;
IF attnCount # 0 THEN UpdateAttnSeqTable[];
EnqueueSpp[@inOrderQueue, b];
giveBackToSocket ← FALSE;
NOTIFY inOrderQueueNotEmpty;
-- examine the OutOfOrderQueue to see if anything can be taken off
-- currently there is never anything on the OutOfOrderQueue.
DO
IF BufferDefs.QueueLength[@outOfOrderQueue] = 0 THEN
BEGIN b ← NIL; EXIT; END;
b ← DequeueSpp[@outOfOrderQueue];
SELECT LOOPHOLE[b.ois.sequenceNumber - nextInputSeq, INTEGER] FROM
0 => EXIT;
-- this is the next in order packet, pick it up in outer loop

IN (0..duplicateWindow] => EnqueueSpp[@tempQueue, b];
-- still out of order

ENDCASE => OISCP.ReturnFreeOisBuffer[b];
-- old duplicate, throw away

ENDLOOP;
-- put buffers that are on tempQueue back onto outOfOrderQueue
UNTIL (tempQueue.first) = NIL DO
EnqueueSpp[@outOfOrderQueue, DequeueSpp[@tempQueue]]; ENDLOOP;
IF b = NIL THEN EXIT;
-- else, loop to process packet from outOfOrderQueue that is now next

ENDLOOP;
END
ELSE IF CommFlags.doStats THEN StatIncr[statDataPacketsReceivedEarly];
-- early packet

END;
END;
END; -- RightOrEarlyPacket

DuplicatePacket: INTERNAL PROCEDURE = INLINE
BEGIN
IF CommFlags.doStats THEN StatIncr[statDataPacketsReceivedAgain];
-- send acks only if the other end asked
IF b.ois.sendAck THEN returnAnAck ← TRUE;
END; -- DuplicatePacket

-- main body of this sprocedure
IF b.ois.pktLength >= bytesPerSequencedPktHeader THEN
BEGIN
GotSequencedPacketLocked[];
IF returnAnAck THEN SendSystemPacket[FALSE]; -- we may have to send the ack
END
ELSE IF CommFlags.doStats THEN StatIncr[statEmptyFunnys];
IF giveBackToSocket THEN OISCP.ReturnFreeOisBuffer[b];
END; -- GotSequencedPacket

-- This procedure attempts to processes the attention packet, and returns whether it
-- was successful or not. Success implies that an entry for the attn packet already
-- exists in the attnSeqTable, or has just been made. This packet is now a candidate
-- for the inOrderQueue or the outOfOrderQueue. If an entry can not be made because
-- of space restrictions we pretend we never ever saw this packet and let it be
-- retransmitted from the source. The attnSeqTable is reasonablly big, and so this
-- decision should not cause adverse effects. This procedure is only called if the packet
-- was within the accept window.

AttentionPacketProcessed: INTERNAL PROCEDURE [b: BufferDefs.OisBuffer] RETURNS [BOOLEAN] =
BEGIN
i: CARDINAL ← 1;
alreadyThere: BOOLEAN ← FALSE;
advanceB: BufferDefs.OisBuffer;
to, from: Environment.Block;
nBytes: CARDINAL;
IF attnCount = maxPendingAttns THEN RETURN[FALSE]; -- no space
-- insert the b.ois.sequenceNumber in the attnSeqTable if not already there
IF attnCount # 0 -- see if an entry is already there
THEN
FOR i IN (0..attnCount] DO
IF attnSeqTable[i] = b.ois.sequenceNumber THEN
BEGIN alreadyThere ← TRUE; EXIT; END;
ENDLOOP;
IF NOT alreadyThere THEN -- because attnCount=0 or because really not there!
BEGIN
IF (advanceB ← MaybeGetFreeReceiveOisBufferFromPool[pool]) = NIL THEN
RETURN[FALSE]; -- cop out
attnSeqTable[i] ← b.ois.sequenceNumber;
attnCount ← attnCount + 1;
-- make a copy and then enqueue it
to ← [@advanceB.ois, 0, b.ois.pktLength];
from ← [@b.ois, 0, b.ois.pktLength];
nBytes ← ByteBlt.ByteBlt[to: to, from: from];
EnqueueSpp[@inAttnQueue, advanceB];
NOTIFY newInputAttn;
END;
IF CommFlags.doStats THEN StatIncr[statAttentionsReceived];
RETURN[TRUE];
END; -- AttentionPacketProcessed

-- This procedure updates the attnSeqTable when the nextInputSeq is updated.
-- That is to say, we are removing any entries in the attnSeqTable that we plan
-- to ack, the next time we send out a packet.

UpdateAttnSeqTable: INTERNAL PROCEDURE =
BEGIN
i: CARDINAL ← 1;
DO
IF i > attnCount THEN EXIT;
IF LOOPHOLE[(attnSeqTable[i] - nextInputSeq), INTEGER] < 0 THEN
BEGIN
attnSeqTable[i] ← attnSeqTable[attnCount];
attnSeqTable[attnCount] ← 0;
attnCount ← attnCount - 1;
END
ELSE i ← i + 1;
ENDLOOP;
END; -- UpdateAttnSeqTable

-- This procedure examines an error protocol packet.

GotErrorPacket: ENTRY PROCEDURE [b: BufferDefs.OisBuffer] =
BEGIN
ENABLE UNWIND => NULL;
IF CommFlags.doStats THEN StatIncr[statErrorPacketsReceived];
SELECT b.ois.errorType FROM
noSocketOisErrorCode =>
SELECT state FROM
unestablished, activeEstablish, waitEstablish =>
{whyFailed ← noServiceAtDestination;
SuspendStream[noRouteToDestination]};
established, open => SuspendStream[remoteServiceDisappeared];
ENDCASE => NULL;
ENDCASE => NULL;
OISCP.ReturnFreeOisBuffer[b];
END; -- GotErrorPacket

-- Cool Procedures

-- This procedure sets the wait time.

SetWaitTime: ENTRY PROCEDURE [time: OISCPTypes.WaitTime] =
BEGIN waitPulses ← MilliSecondsToPulses[time]; END; -- SetWaitTime

-- This procedure returns the local and remote address of this packet stream.

FindAddresses: ENTRY PROCEDURE RETURNS [local, remote: NetworkAddress] =
BEGIN local ← localAddr; remote ← remoteAddr; END; -- FindAddresses

-- This procedure returns the number of data bytes that can fit into an sppBuffer.

GetSenderSizeLimit: ENTRY PROCEDURE RETURNS [CARDINAL] =
BEGIN RETURN[maxOisPktLength - bytesPerSequencedPktHeader]; END;
-- GetSenderSizeLimit



-- This procedure atempts to establish the local end of the connection
-- to the incoming packet.

EstablishThisConnection: INTERNAL PROCEDURE [b: BufferDefs.OisBuffer]
RETURNS [returnAnAck, giveBackToSocket: BOOLEAN] =
BEGIN
returnAnAck ← FALSE;
giveBackToSocket ← TRUE;
-- If state=activeEstablish then master/slave response when sockets are completely
-- specified. If state=waitEstablish then other end initiates and knows all about us.
IF b.ois.source.host = remoteAddr.host AND
b.ois.source.socket = remoteAddr.socket AND b.ois.destinationConnectionID =
localConnectionID AND b.ois.sourceConnectionID # unknownConnID AND
b.ois.sequenceNumber = 0 THEN
BEGIN
remoteConnectionID ← b.ois.sourceConnectionID;
maxOutputSeq ← b.ois.allocationNumber;
state ← established;
IF b.ois.sendAck THEN returnAnAck ← TRUE;
IF NOT b.ois.systemPacket THEN
BEGIN
-- if the attention bit is set then we try to process the attention, and only
-- if we were able to do so do we put the packet on the inOrderQueue.
IF ~b.ois.attention OR (b.ois.attention AND AttentionPacketProcessed[b]) THEN
BEGIN
nextInputSeq ← nextInputSeq + 1;
EnqueueSpp[@inOrderQueue, b];
giveBackToSocket ← FALSE;
IF attnCount # 0 THEN UpdateAttnSeqTable[];
-- remove knowledge of old attns
state ← open;
NOTIFY inOrderQueueNotEmpty;
END;
END
ELSE IF CommFlags.doStats THEN StatIncr[statSystemPacketsReceived];
NOTIFY connectionIsEstablished;
END
-- If state=activeEstablish then simultaneous establishment when sockets are
-- completely specified. If state=waitEstablish then other end initiates and knows
-- all about us except our connection ID.

ELSE
IF b.ois.source.host = remoteAddr.host AND
b.ois.source.socket = remoteAddr.socket AND b.ois.destinationConnectionID =
unknownConnID AND b.ois.sourceConnectionID # unknownConnID AND
b.ois.sequenceNumber = 0 THEN
BEGIN
remoteConnectionID ← b.ois.sourceConnectionID;
maxOutputSeq ← b.ois.allocationNumber;
IF state = waitEstablish THEN returnAnAck ← TRUE;
state ← established;
IF b.ois.sendAck THEN returnAnAck ← TRUE;
IF CommFlags.doStats AND b.ois.systemPacket THEN StatIncr[statSystemPacketsReceived];
NOTIFY connectionIsEstablished;
END
-- If state=activeEstablish then response from well known socket at which
-- there is a server process

ELSE
IF state = activeEstablish AND b.ois.source.host = remoteAddr.host
AND b.ois.source.socket # remoteAddr.socket
AND b.ois.destinationConnectionID = localConnectionID AND
b.ois.sourceConnectionID # unknownConnID AND b.ois.sequenceNumber = 0 THEN
BEGIN
remoteAddr.socket ← b.ois.source.socket;
remoteConnectionID ← b.ois.sourceConnectionID;
maxOutputSeq ← b.ois.allocationNumber;
state ← established;
IF b.ois.sendAck THEN returnAnAck ← TRUE;
IF NOT b.ois.systemPacket THEN
BEGIN
-- if the attention bit is set then we try to process the attention, and only
-- if we were able to do so do we put the packet on the inOrderQueue.
IF ~b.ois.attention OR (b.ois.attention AND AttentionPacketProcessed[b]) THEN
BEGIN
nextInputSeq ← nextInputSeq + 1;
EnqueueSpp[@inOrderQueue, b];
giveBackToSocket ← FALSE;
IF attnCount # 0 THEN UpdateAttnSeqTable[];
-- remove knowledge of old attns
state ← open;
NOTIFY inOrderQueueNotEmpty;
END;
END
ELSE IF CommFlags.doStats THEN StatIncr[statSystemPacketsReceived];
NOTIFY connectionIsEstablished;
END
-- mismatched IDs

ELSE IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
IF returnAnAck AND CommFlags.doStats THEN StatIncr[statAckRequestsReceived];
END; -- EstablishThisConnection

-- This procedure causes the active establishement of the connection. A system packet
-- is transmitted to the remote end, and then this procedure casues the caller to hang
-- for a certain maximum time in which it is hoped that the connection has become
-- established owing to the receipt of an appropriate packet from the remote end.

ActivelyEstablish: PROCEDURE =
BEGIN
currentPulse, startPulse: LONG CARDINAL;

WaitUntilActivelyEstablished: ENTRY PROCEDURE RETURNS [BOOLEAN] = INLINE
BEGIN
ENABLE UNWIND => NULL;
WAIT connectionIsEstablished;
IF state = established OR state = open THEN RETURN[TRUE]
ELSE
BEGIN
IF state = suspended THEN RETURN WITH ERROR ConnectionFailed[whyFailed];
currentPulse ← System.GetClockPulses[];
IF (currentPulse - startPulse) >= waitPulses THEN
BEGIN SIGNAL ConnectionFailed[timeout]; startPulse ← System.GetClockPulses[]; END;
RETURN[FALSE];
END;
END; -- WaitUntilActivelyEstablished

startPulse ← System.GetClockPulses[];
DO
SendSystemPacket[TRUE];
-- waits for 2 seconds to see if the connection is established
IF WaitUntilActivelyEstablished[] -- this returns or raises a SIGNAL -- THEN
RETURN;
ENDLOOP;
END; -- ActivelyEstablish

-- This procedure waits for the remote end to initiate connection establishement.

WaitUntilEstablished: ENTRY PROCEDURE =
BEGIN
ENABLE UNWIND => NULL;
currentPulse, startPulse: LONG CARDINAL;
startPulse ← System.GetClockPulses[];
DO
-- waits for 2 seconds each time around
WAIT connectionIsEstablished;
IF state = established OR state = open THEN RETURN
ELSE
BEGIN
currentPulse ← System.GetClockPulses[];
IF (currentPulse - startPulse) >= waitPulses THEN
BEGIN SIGNAL ConnectionFailed[timeout]; startPulse ← System.GetClockPulses[]; END;
END;
ENDLOOP;
END; -- WaitUntilEstablished

-- This procedure destroys the packet stream. The procedure changes the state of
-- the packet stream, NOTIFYs the Retransmitter and Receiver to self destruct,
-- and then waits until this happens. When this condition is satisfied, the socket
-- channel is deleted and, the packet stream data structures cleaned up.

Delete: PUBLIC PROCEDURE =
BEGIN
DeleteActivate[];
Socket.Abort[socketCH];
JOIN retransmitterFork;
JOIN receiverFork;
DeleteCleanup[];
Socket.Delete[socketCH];
Runtime.SelfDestruct[];
END; -- Delete

DeleteActivate: ENTRY PROCEDURE = INLINE
BEGIN
state ← terminating;
pleaseStop ← TRUE;
NOTIFY retransmitterWakeupTimer;
END; -- DeleteActivate

DeleteCleanup: ENTRY PROCEDURE = INLINE
BEGIN OPEN SocketInternal;
ENABLE UNWIND => NULL;
IF sentBuffer # NIL THEN OISCP.ReturnFreeOisBuffer[sentBuffer];
BufferDefs.QueueCleanup[@inOrderQueue];
BufferDefs.QueueCleanup[@inAttnQueue];
BufferDefs.QueueCleanup[@outOfOrderQueue];
BufferDefs.QueueCleanup[@tempQueue];
WHILE sentQueue.length<ChannelHandleToSocketHandle[socketCH].pool.sendInUse DO
-- waiting for asynchronous puts to complete and be requeue appropriately
WAIT letClientRun; -- wait on any convenient variable - ABORTs must be DISABLED!
ENDLOOP;
BufferDefs.QueueCleanup[@sentQueue];
END; -- DeleteCleanup

MilliSecondsToPulses: PROCEDURE [ms: LONG CARDINAL]
RETURNS [pulses: System.Pulses] =
BEGIN
-- we must be carefull about multiplication overflow since milliSeconds must be
-- converted to microSeconds
IF ms >= LAST[LONG CARDINAL]/1000 -- multiplication overflow condition
THEN pulses ← System.MicrosecondsToPulses[LAST[LONG CARDINAL]]
ELSE pulses ← System.MicrosecondsToPulses[1000*ms];
END; -- end MilliSecondsToPulses


-- initialization (Cool)
-- initialize connection control parameters

-- This initializes the necessary data structures for a packet stream instance.
-- It fills in all the necessary parameters and creates two processes that look
-- after the well being of this packet stream. One is a Retransmitter, that periodically
-- checks the sentQueue to see if there is anything unacknowledged, and the other
-- is the Receiver, which sees if there are any incoming packets on the
-- corresponding socket.

state ← unestablished;
whySuspended ← notSuspended;
stateBeforeSuspension ← unestablished;
Process.SetTimeout[@connectionIsEstablished, Process.MsecToTicks[2000]];
nextInputSeq ← unackedOutputSeq ← nextOutputSeq ← 0;
maxOutputSeq ← nextOutputSeq + defaultSendAllocation - 1;
SELECT classOfService FROM
bulk =>
{socketCH ← Socket.Create[localAddr, bulkSendBufs, bulkReceiveBufs, 0, TRUE];
maxInputSeq ← bulkInitialAllocation};
transactional =>
{socketCH ← Socket.Create[localAddr, transactSendBufs, transactReceiveBufs, 0,
TRUE];
maxInputSeq ← transactInitialAllocation};
ENDCASE => ERROR;
pool ← ps.pool ← SocketInternal.GetBufferPool[
SocketInternal.ChannelHandleToSocketHandle[socketCH]];
localAddr ← Socket.GetStatus[socketCH].localAddr;
IF remoteConnectionID # unknownConnID THEN state ← established;
Process.SetTimeout[@newAllocation, Process.MsecToTicks[1000]];
CommUtilDefs.EnableAborts[@newAllocation];
sendAnAck ← FALSE;
maxOisPktLength ← OISCPTypes.maxBytesPerPkt;
Process.SetTimeout[@newInputAttn, Process.MsecToTicks[1000]];
CommUtilDefs.EnableAborts[@newInputAttn];
FOR attnCount IN (0..maxPendingAttns] DO attnSeqTable[attnCount] ← 0; ENDLOOP;
attnCount ← 0;
sentBuffer ← NIL;
BufferDefs.QueueInitialize[@inOrderQueue];
BufferDefs.QueueInitialize[@inAttnQueue];
BufferDefs.QueueInitialize[@outOfOrderQueue];
BufferDefs.QueueInitialize[@tempQueue];
BufferDefs.QueueInitialize[@sentQueue];
Process.SetTimeout[@inOrderQueueNotEmpty, Process.MsecToTicks[1000]];
CommUtilDefs.EnableAborts[@inOrderQueueNotEmpty];
ackRequestPulse ← lastProbePulse ← lastDelayCalculationPulse ←
lastPacketReceivedPulse ← System.GetClockPulses[];
probeCounter ← 0;
dataPacketRetransmitPulses ← MilliSecondsToPulses[
initialDataPacketRetransmitTime]; -- msecs converted to Pulses
probeRetransmitPulses ← MilliSecondsToPulses[
probeMultiplier*initialDataPacketRetransmitTime + 40];
-- msecs converted to Pulses
delaySum ← [0];
delayCount ← 0;
normalRetransUpdates ← doubleRetransUpdates ← 0; -- **temp
waitPulses ← MilliSecondsToPulses[timeout]; -- msecs converted to Pulses
pleaseStop ← FALSE;
Process.SetTimeout[@letClientRun, Process.MsecToTicks[250]]; -- this is also used in DeleteCleanup - do not ENABLE ABORTs
Process.SetTimeout[
@retransmitterWakeupTimer, Process.MsecToTicks[initialRetransmitterTime]];
-- This baroque code is to ensure that the state is not looked at by any of the other
-- processes until this initialization code has finished examining this variable without
-- entering the monitor.
IF state = unestablished THEN
BEGIN
IF establish THEN
BEGIN
state ← activeEstablish;
retransmitterFork ← FORK Retransmitter[];
receiverFork ← FORK Receiver[];
Process.Yield[];
ActivelyEstablish[];
END
ELSE
BEGIN
state ← waitEstablish;
retransmitterFork ← FORK Retransmitter[];
receiverFork ← FORK Receiver[];
Process.Yield[];
WaitUntilEstablished[];
END;
END
ELSE
BEGIN
SendSystemPacket[FALSE]; -- gratuitous ack in case we are a server agent now.
-- Retransmissions of this gratuitous packet will ocurr in the nature of probes.
retransmitterFork ← FORK Retransmitter[];
receiverFork ← FORK Receiver[];
Process.Yield[];
END;
waitPulses ← MilliSecondsToPulses[PacketStream.infiniteWaitTime]; -- Gets, WaitAttentions default to very long
RETURN[@ps, remoteAddr, remoteConnectionID];
END. -- of PacketStreamInstance module

LOG

Time: January 26, 1981 10:03 AM By: Garlick Action: 1.) added classOfService parameter to reduce buffers in interactive case 2.) added interpretation of Error Protocol packets 3.) increased inactivity timeouts for internet cases 4.) initial data operation timeout set to infinite 5.) acks no longer stimulated by Gets, rather in receiver 6.) send an alocation packet if just opening the window 7.) reinstated slow media adaptive algorithm 8.) enabled Process.Aborting for Gets, Puts, WaitAttn.