-- NetworkStreamInstance.mesa (last edited by: BLyon on: March 21, 1981  12:33 PM)
-- Function: The implementation module for an instance of the Network Stream front end.

DIRECTORY
  BufferDefs USING [OisBuffer],
  ByteBlt USING [ByteBlt],
  Environment USING [Byte],
  NetworkStreamInternal USING [ControlObject],
  OISCP USING [
    GetOisPacketTextLength, ReturnFreeOisBuffer, SetOisPacketTextLength],
  OISCPTypes USING [bytesPerLevel2SppHeader],
  PacketStream USING [
    ConnectionSuspended, Destroy, Get, GetSendSppBuffer, Handle, Put,
    ReturnGetSppDataBuffer, WaitForAttention],
  Runtime USING [SelfDestruct],
  Stream USING [
    Byte, Word, Handle, Block, CompletionCode, defaultInputOptions, InputOptions,
    LongBlock, Object, ShortBlock, SSTChange, SubSequenceType, TimeOut];

NetworkStreamInstance: MONITOR [psH: PacketStream.Handle] RETURNS [Stream.Handle]
  IMPORTS ByteBlt, OISCP, Stream, PacketStream, Runtime =
  BEGIN

  -- debugging (only reason for MONITOR)
  sanityChecking: BOOLEAN = FALSE;
  sendInProgress: BOOLEAN ← FALSE;
  TestAndSetSendInProgress: ENTRY PROCEDURE = INLINE
    BEGIN
    IF sendInProgress THEN ERROR;
    sendInProgress ← TRUE;
    END;

  TestAndResetSendInProgress: ENTRY PROCEDURE = INLINE
    BEGIN
    IF NOT sendInProgress THEN ERROR;
    sendInProgress ← FALSE;
    END;

  -- the vector of procedures for operating on, and controlling the network stream
  controlObject: NetworkStreamInternal.ControlObject ←
    [
      -- the vector of procedures as per the standard Pilot Stream interface
      streamObject: Stream.Object[
      options: Stream.defaultInputOptions, getByte: GetByte, putByte: PutByte,
      getWord: GetWord, putWord: PutWord, get: GetBlock, put: PutBlock,
      setSST: SetSST, sendAttention: SendAttention, waitAttention: WaitAttention,
      delete: Delete],
      -- handle for the packet stream
      psH: psH];

  LeftAndRight: TYPE = MACHINE DEPENDENT RECORD [left, right: Environment.Byte];

  -- A client will typicaly have three processes accessing this module.  The first
  -- receives data, the second waits for attentions, and the third transmits data, sends
  -- attentions and changes the subsequence type.  As a consequence it is not necessary
  -- for this module to be a monitor, since there is no interaction between the three
  -- processes in this module.  The interaction occurs in the PktStreamInstance module,
  -- which is a monitor.  Multiple client processes must not perform data transfer in one
  -- direction; the result is unpredicatble.  Care should be taken when deleting the stream
  -- and therefore this module.

  -- input
  inputBuffer: BufferDefs.OisBuffer ← NIL;
  inputFinger: CARDINAL;
  inputSST: Stream.SubSequenceType ← 0;
  -- output
  outputBuffer: BufferDefs.OisBuffer ← NIL;
  outputFinger: CARDINAL;
  outputSST: Stream.SubSequenceType ← 0;
  outputSSTSent: BOOLEAN ← TRUE;
  outputBufferSize: CARDINAL ← 0;

  -- Hot Procedures

  GetByte: PROCEDURE [sH: Stream.Handle] RETURNS [byte: Stream.Byte] =
    BEGIN
    IF inputBuffer # NIL AND inputFinger + 2 < GetSppDataLength[inputBuffer] THEN
      BEGIN -- "+2" lets GetBlock give back the buffer if we take the last byte
      byte ← inputBuffer.ois.sppBytes[inputFinger];
      inputFinger ← inputFinger + 1;
      RETURN;
      END
    ELSE
      BEGIN
      array: PACKED ARRAY [0..1] OF Stream.Byte;
      [] ← sH.get[sH, [@array, 0, 1], [FALSE, FALSE, FALSE, TRUE, TRUE]];
      RETURN[array[0]];
      END;
    END;

  GetWord: PROCEDURE [sH: Stream.Handle] RETURNS [word: Stream.Word] =
    BEGIN OPEN w: LOOPHOLE[word, LeftAndRight];
    w.left ← GetByte[sH];
    w.right ← GetByte[sH];
    END;

  -- This procedure fills a client's block with data from an incoming packet
  GetBlock: PROCEDURE [
    sH: Stream.Handle, block: Stream.Block, options: Stream.InputOptions]
    RETURNS [
      bytesTransferred: CARDINAL, why: Stream.CompletionCode,
      sst: Stream.SubSequenceType] =
    -- block has been passed by value, so we are upadting our copy, not the clients
    BEGIN
    input: Stream.Block;
    moved: CARDINAL;
    endOfMessageArrived: BOOLEAN ← FALSE;
    bytesTransferred ← 0;
    why ← normal;
    sst ← inputSST;
    WHILE block.startIndex < block.stopIndexPlusOne DO
      UNTIL inputBuffer # NIL DO
	inputFinger ← 0;
	inputBuffer ← PacketStream.Get[psH];
	IF inputBuffer = NIL THEN SIGNAL Stream.TimeOut[block.startIndex]
	ELSE
	  BEGIN
	  sst ← inputBuffer.ois.subtype;
	  IF inputSST # sst THEN
	    BEGIN
	    inputSST ← sst;
	    IF options.signalSSTChange THEN
	      SIGNAL Stream.SSTChange[inputSST, block.startIndex]
	    ELSE BEGIN why ← sstChange; RETURN; END;
	    END;
	  END;
	ENDLOOP;
      input ←
	[blockPointer: @inputBuffer.ois.sppBytes, startIndex: inputFinger,
	  stopIndexPlusOne: GetSppDataLength[inputBuffer]];
      moved ← ByteBlt.ByteBlt[block, input];
      bytesTransferred ← bytesTransferred + moved;
      block.startIndex ← block.startIndex + moved;
      inputFinger ← inputFinger + moved;
      -- if the packet buffer is empty return it
      IF inputFinger = input.stopIndexPlusOne THEN
		BEGIN -- exhausted the packet contents
		endOfMessageArrived ← inputBuffer.ois.endOfMessage;
		PacketStream.ReturnGetSppDataBuffer[psH, inputBuffer];
		inputBuffer ← NIL;
		END;
      -- if there is no packet buffer and the block is still empty maybe signal
      IF inputBuffer = NIL AND block.startIndex < block.stopIndexPlusOne AND
	options.signalLongBlock THEN
		BEGIN SIGNAL Stream.LongBlock[block.startIndex]; END;
      -- exit if client wants to terminate when endOfMessage bit on
      IF endOfMessageArrived AND options.terminateOnEndPhysicalRecord THEN
		BEGIN why ← endRecord; EXIT; END;
      ENDLOOP;
    -- if there is data in the packet buffer then the block was short
    IF inputBuffer # NIL AND options.signalShortBlock THEN
      BEGIN ERROR Stream.ShortBlock; END;
    END; -- GetBlock

  -- The strategy on the transmission side, is to allocate a buffer only when there is
  -- data to be copied into the buffer, or if an empty packet must be transmitted.
  -- State information is kept around for things like whether a new SST has been sent
  -- to the other end or not, incase it is changed without sending any intervening data.


  -- This procedure sends a client's block of data in one or more packets.
  -- Since SendNow isn't a procedure all by itself, we try to know when the client
  -- did a SendNow, so that we can ask the other end for an ack.  If we have no
  -- buffer, we send a data packet with zero data.
  PutBlock: PROCEDURE [
    sH: Stream.Handle, block: Stream.Block, endPhysicalRecord: BOOLEAN] =
    -- block has been passed by value, so we are updating our copy, not the clients
    BEGIN
    sendNowBlock: Stream.Block = [NIL, 0, 0];
    output: Stream.Block;
    moved: CARDINAL;
    IF sanityChecking THEN TestAndSetSendInProgress[];
    IF (block = sendNowBlock AND endPhysicalRecord) THEN
      BEGIN -- this must be a SendNow operation, or something like it sigh...
      SendNow[];
      IF sanityChecking THEN TestAndResetSendInProgress[];
      RETURN;
      END;
    -- see whether this is a no-op or not.
    IF (block.stopIndexPlusOne - block.startIndex) = 0 AND outputBuffer = NIL AND
      outputSSTSent THEN
      BEGIN
      IF sanityChecking THEN TestAndResetSendInProgress[];
      RETURN;
      END;
    IF outputBufferSize = 0 THEN outputBufferSize ← psH.getSenderSizeLimit[];
    WHILE block.startIndex < block.stopIndexPlusOne DO
      IF outputBuffer = NIL THEN
	BEGIN
	outputBuffer ← PacketStream.GetSendSppBuffer[psH];
	outputBuffer.ois.sendAck ← FALSE;
	outputBuffer.ois.attention ← FALSE;
	outputFinger ← 0;
	END;
      output ←
	[blockPointer: @outputBuffer.ois.sppBytes, startIndex: outputFinger,
	  stopIndexPlusOne: outputBufferSize];
      moved ← ByteBlt.ByteBlt[output, block];
      block.startIndex ← block.startIndex + moved;
      outputFinger ← outputFinger + moved;
      IF outputFinger = outputBufferSize THEN
        BEGIN
        outputBuffer.ois.endOfMessage ← endPhysicalRecord AND block.startIndex >= block.stopIndexPlusOne;
        FlushOutputBuffer[];
        END;
      ENDLOOP;
    IF sanityChecking THEN TestAndResetSendInProgress[];
    END; -- PutBlock

  PutByte: PROCEDURE [sH: Stream.Handle, byte: Stream.Byte] =
    BEGIN
    IF sanityChecking THEN TestAndSetSendInProgress[];
    IF outputBuffer # NIL AND outputFinger + 2 < outputBufferSize THEN
      BEGIN -- "+2" lets PutBlock flush the buffer if we fill the last byte
      outputBuffer.ois.sppBytes[outputFinger] ← byte;
      outputFinger ← outputFinger + 1;
      IF sanityChecking THEN TestAndResetSendInProgress[];
      END
    ELSE
      BEGIN
      array: PACKED ARRAY [0..1] OF Stream.Byte ← [byte, ];
      IF sanityChecking THEN TestAndResetSendInProgress[];
      PutBlock[sH, [@array, 0, 1], FALSE];
      END;
    END;

  PutWord: PROCEDURE [sH: Stream.Handle, word: Stream.Word] =
    BEGIN OPEN w: LOOPHOLE[word, LeftAndRight];
    sH.putByte[sH, w.left];
    sH.putByte[sH, w.right];
    END;

  SendNow: PROCEDURE =
    BEGIN
    IF outputBuffer = NIL THEN
      BEGIN
      outputBuffer ← PacketStream.GetSendSppBuffer[psH];
      outputBuffer.ois.attention ← FALSE;
      outputFinger ← 0;
      END;
    outputBuffer.ois.sendAck ← TRUE;
    outputBuffer.ois.endOfMessage ← TRUE;
    FlushOutputBuffer[];
    END; -- SendNow

  -- This procedure sets the SST to the specified value and has some side effects.
  -- We assume that the SST is initially = 0, and the first change causes no empty packet
  -- to be sent
  SetSST: PROCEDURE [sH: Stream.Handle, sst: Stream.SubSequenceType] =
    BEGIN
    IF sanityChecking THEN TestAndSetSendInProgress[];
    IF sst # outputSST THEN
      BEGIN
      FlushOutputBuffer[]; -- flush the last buffer if there was one
      IF NOT outputSSTSent THEN
	-- there was no buffer to flush for the old SST so send an empty packet
	BEGIN
	outputBuffer ← PacketStream.GetSendSppBuffer[psH];
	outputBuffer.ois.sendAck ← FALSE;
	outputBuffer.ois.attention ← FALSE;
	outputFinger ← 0;
	FlushOutputBuffer[];
	END;
      -- remember the new SST
      outputSST ← sst;
      outputSSTSent ← FALSE
      END;
    IF sanityChecking THEN TestAndResetSendInProgress[];
    END; -- SetSSt

  -- This procedure sends one byte of data in a packet with the attention bit set.
  SendAttention: PROCEDURE [sH: Stream.Handle, byte: Stream.Byte] =
    BEGIN
    IF sanityChecking THEN TestAndSetSendInProgress[];
    FlushOutputBuffer[]; -- flush the last buffer if there was one
    outputBuffer ← PacketStream.GetSendSppBuffer[psH];
    outputBuffer.ois.sppBytes[0] ← byte;
    outputFinger ← 1;
    outputBuffer.ois.sendAck ← FALSE;
    outputBuffer.ois.attention ← TRUE;
    FlushOutputBuffer[];
    IF sanityChecking THEN TestAndResetSendInProgress[];
    END; -- SendAttention

  -- This procedure waits indefinately until an attention arrives, or an ERROR is raised.
  WaitAttention: PROCEDURE [sH: Stream.Handle] RETURNS [byte: Stream.Byte] =
    BEGIN
    b: BufferDefs.OisBuffer;
    DO
      b ← PacketStream.WaitForAttention[psH];
      -- Get the first data byte, if there is one, otherwise discard this attention packet.
      -- Discarding the attention packet upsets the timeout mechanism, but then this is
      -- a situation that is not supposed to happen, and so we pay the price.
      IF GetSppDataLength[b] # 0 THEN
	BEGIN byte ← b.ois.sppBytes[0]; OISCP.ReturnFreeOisBuffer[b]; EXIT; END
      ELSE OISCP.ReturnFreeOisBuffer[b];
      ENDLOOP;
    END; -- WaitAttention

  -- This procedure flushes (i.e. sends out) the outputBuffer if there is one.
  FlushOutputBuffer: PROCEDURE =
    BEGIN
    b: BufferDefs.OisBuffer;
    -- don't leave outputBuffer dangling in case of Stream deletion
    IF outputBuffer = NIL THEN RETURN;
    b ← outputBuffer;
    outputBuffer ← NIL;
    SetSppDataLength[b, outputFinger];
    b.ois.subtype ← outputSST;
    outputSSTSent ← TRUE;
    -- now put the buffer to the packet stream.
    -- if the stream is PacketStream.ConnectionSuspended, the buffer is returned and the
    -- data it contained is lost. This is ok since no more useful can be done with this
    -- stream anyway. NOTE that the signal is allowed to continue down the stack s.t.
    -- the client can catch it also.
    PacketStream.Put[psH, b ! PacketStream.ConnectionSuspended =>
      BEGIN
      OISCP.ReturnFreeOisBuffer[b];
      IF sanityChecking THEN TestAndResetSendInProgress[];
      END ];
    END; -- FlushOutputBuffer

  -- This procedure sets the length of the sequenced packet given the length of data.
  SetSppDataLength: PROCEDURE [b: BufferDefs.OisBuffer, length: CARDINAL] =
    INLINE
    BEGIN
    OISCP.SetOisPacketTextLength[
      b, length + OISCPTypes.bytesPerLevel2SppHeader];
    END; -- SetSppDataLength

  -- This procedure returns the amount of data in the sequenced packet.
  GetSppDataLength: PROCEDURE [b: BufferDefs.OisBuffer] RETURNS [CARDINAL] =
    INLINE
    BEGIN
    RETURN[
      OISCP.GetOisPacketTextLength[b] - OISCPTypes.bytesPerLevel2SppHeader];
    END; -- GetSppDataLength


  -- Cool Procedures

  -- This procedure is instrumental in deleting this transducer.
  Delete: PUBLIC PROCEDURE [sH: Stream.Handle] =
    BEGIN
    IF sanityChecking THEN TestAndSetSendInProgress[];
    controlObject.streamObject ←
      [options: controlObject.streamObject.options, getByte: NIL, putByte: NIL,
      getWord: NIL, putWord: NIL, get: NIL, put: NIL,
      setSST: NIL, sendAttention: NIL, waitAttention: NIL,
      delete: NIL];
    IF inputBuffer # NIL THEN
      PacketStream.ReturnGetSppDataBuffer[psH, inputBuffer];
    IF outputBuffer # NIL THEN OISCP.ReturnFreeOisBuffer[outputBuffer];
    PacketStream.Destroy[psH];
    Runtime.SelfDestruct[];
    -- no resetting of send sanity check here!
    END; -- Delete


  -- initialization (Cool)

  RETURN[@controlObject.streamObject];
  END.  -- of NetworkStreamInstance module

LOG

Time: May 26, 1978  11:22 AM  By: Dalal  Action: created file.
Time: November 9, 1978  9:07 AM  By: Dalal  Action: modified GetBlock and PutBlock.
Time: March 13, 1979  6:03 PM  By: Dalal  Action: modified SendAttention and WaitAttention.
Time: August 31, 1979  12:39 PM  By: Dalal  Action: made two INLINEs.
Time: January 31, 1980  4:33 AM  By: Forrest  Action: Added Mandatory fields to Stream.Object, using Stream Defaults.
Time: July 8, 1980  5:07 PM  By: BLyon  Action: removed default procs and replaced with GetByte, GetWord, PutByte, PutWord.
Time: August 11, 1980  4:57 PM  By: BLyon  Action: Changed ByteBltDefs to ByteBlt 
Time: September 26, 1980  2:19 PM  By: Garlick  Action: In GetBlock, initialized sst to inputSST so it always gets returned with proper value. 
Time: January 23, 1981  1:41 PM  By: Garlick  Action: Added setting of the EndOfMessage bit in the packet when endPhysicalRecord set in PutBlock.  Added interpretation of endPhysicalRecord (as propogated from the sender) in GetBlock.  Also made WaitAttention generate the resumeable Stream.Timeout. 
Time: March 18, 1981  3:47 PM  By: BLyon  Action: Put NIL's in Delete