// PupBSPStreams.bcpl -- Byte Stream Protocol
// Companion files are PupBSPOpenClose, PupBSPProt and PupBSPa
// This module contains the 'main line' code.
// Copyright Xerox Corporation 1979, 1980, 1981, 1982
// Last modified May 3, 1982 11:32 AM by Taft
get "Pup.decl"
external
[
// outgoing procedures
BSPGetByte; BSPPrepareIPBI; BSPGetCleanup; BSPGetMark
BSPPutByte; BSPPrepareOPBI; BSPForceOutput; BSPPutMark
BSPCloses; BSPEndofs; BSPErrors
// incoming procedures
BSPGets; BSPPuts; CloseBSPSocket
SendAck; TransmitStrategy; SetTimeout
ReleasePBI; GetPBI; CompletePup; SetPupID
Zero; MoveBlock; CallSwat; SysErr; Gets; Puts; Errors
DoubleIncrement; SetTimer; TimerHasExpired; Block
Enqueue; Dequeue; Min
// outgoing statics
bspVersion
// incoming statics
offsetBSPStr
]
static [ bspVersion = 1011 ] //protocol and package versions
//---------------------------------------------------------------------------
let BSPCloses(str) = CloseBSPSocket(str-offsetBSPStr)
//---------------------------------------------------------------------------
//----------------------------------------------------------------------------
and BSPErrors(str, ec) = //default BSP stream error handler
//----------------------------------------------------------------------------
selecton ec into
[
case ecBadStateForGets:
case ecGetsTimeout:
case ecMarkEncountered:
-1
case ecBadStateForPuts:
case ecPutsTimeout:
false
default:
SysErr(str, ec)
]
//----------------------------------------------------------------------------
and BSPGetByte(str, timeout; numargs na) = valof
//----------------------------------------------------------------------------
// Stream Gets procedure called when there might not be a current iPBI.
[
let ec = BSPPrepareIPBI(str-offsetBSPStr, (na gr 1? timeout, -1))
resultis ec ne 0? Errors(str, ec), Gets(str)
]
//----------------------------------------------------------------------------
and BSPPrepareIPBI(soc, timeout) = valof
//----------------------------------------------------------------------------
// Attempts to get a new IPBI if necessary.
// Returns zero normally and an error code upon failure.
[
let timer = nil; SetTimer(lv timer, timeout)
[ // repeat
Block()
if soc>>BSPSoc.iPBI ne 0 then break // someone else already set it up
if soc>>BSPSoc.unReadPups ne 0 then
[ // new PBI ready to read
let pbi = Dequeue(lv soc>>BSPSoc.bspIQ)
compileif pupDebug then
[ if pbi eq 0 then CallSwat("[BSPPrepareIPBI] Input Q fouled up") ]
soc>>BSPSoc.iPBI = pbi
compileif typeData ge typeMark % typeAData ge typeMark %
typeAMark ls typeMark then
[ Barf("Pup type inequality assumption invalid") ]
soc>>BSPSoc.markPending = pbi>>PBI.pup.type ge typeMark // Mark or AMark
soc>>BSPSoc.iWordP = lv pbi>>PBI.pup.words
soc>>BSPSoc.iByteP = 0
soc>>BSPSoc.iByteC = pbi>>PBI.pup.length-pupOvBytes
break
]
// don't test state until after checking unReadPups so that a socket in
// stateEndIn won't get an error until all received PBIs have been read.
let state = soc>>BSPSoc.state
unless state eq stateOpen % state eq stateEndOut resultis ecBadStateForGets
if timeout ne -1 & TimerHasExpired(lv timer) resultis ecGetsTimeout
] repeat
test soc>>BSPSoc.markPending
ifso resultis ecMarkEncountered
ifnot [ soc>>BSPSoc.gets = BSPGets; resultis 0 ]
]
//----------------------------------------------------------------------------
and BSPGetCleanup(soc, dataByte) = valof
//----------------------------------------------------------------------------
// Called from BSPGets after it consumes the last byte of the current Pup.
[
DoubleIncrement(lv soc>>BSPSoc.userByteID,
(soc>>BSPSoc.iPBI)>>PBI.pup.length-pupOvBytes)
ReleasePBI(soc>>BSPSoc.iPBI)
soc>>BSPSoc.iPBI = 0
soc>>BSPSoc.unReadPups = soc>>BSPSoc.unReadPups-1
soc>>BSPSoc.gets = BSPGetByte
// If we previously sent an allocation of zero and at least half the socket's
// allocation is now available then send a gratuitous ack. The latter test
// is to prevent the Pup equivalent of what the Arpa Internet people
// call "Silly Window Syndrome".
if soc>>BSPSoc.sentZeroAlloc &
soc>>BSPSoc.unReadPups le soc>>BSPSoc.maxIPBI rshift 1 then
SendAck(soc)
resultis dataByte
]
//----------------------------------------------------------------------------
and BSPEndofs(str) = str>>BSPStr.gets eq BSPGets? false,
BSPPrepareIPBI(str-offsetBSPStr, 0) ne 0
//----------------------------------------------------------------------------
//----------------------------------------------------------------------------
and BSPPutByte(str, dataByte, timeout; numargs na) = valof
//----------------------------------------------------------------------------
// Stream Puts procedure called when there might not be a current oPBI.
[
let ec = BSPPrepareOPBI(str-offsetBSPStr, (na gr 2? timeout, -1))
if ec ne 0 resultis Errors(str, ec)
resultis Puts(str, dataByte)
]
//----------------------------------------------------------------------------
and BSPPrepareOPBI(soc, timeout) = valof
//----------------------------------------------------------------------------
// Attempts to set up a new OPBI if necessary.
// Returns zero normally and an error code upon failure.
[
let timer = nil; SetTimer(lv timer, timeout)
[ // repeat
Block()
if soc>>BSPSoc.oPBI ne 0 then break // someone else already set it up
let state = soc>>BSPSoc.state
unless state eq stateOpen % state eq stateEndIn resultis ecBadStateForPuts
if soc>>BSPSoc.pupAlloc gr 0 & soc>>BSPSoc.byteAlloc gr 0 &
soc>>BSPSoc.numTPBI gr 1 & soc>>BSPSoc.numOPBI gr 1 then
[ // ok to allocate a new PBI
let pbi = GetPBI(soc)
pbi>>PBI.pup.length = Min(soc>>BSPSoc.bytesPerPup, soc>>BSPSoc.byteAlloc)
pbi>>PBI.pup.type = typeData
soc>>BSPSoc.oPBI = pbi
soc>>BSPSoc.oWordP = lv pbi>>PBI.pup.words
soc>>BSPSoc.oByteP = 0
soc>>BSPSoc.oByteC = pbi>>PBI.pup.length
soc>>BSPSoc.puts = BSPPuts // assembly-language version
break
]
if timeout ne -1 & TimerHasExpired(lv timer) resultis ecPutsTimeout
] repeat
resultis 0
]
//----------------------------------------------------------------------------
and BSPForceOutput(soc, sendNow; numargs na) = valof
//----------------------------------------------------------------------------
// Called from BSPPuts after it completely fills the current Pup, and
// elsewhere to force transmission of a partially-filled Pup.
// If sendNow then the Pup is sent as an AData or AMark to force immediate
// acknowledgment and thereby get the output stream cleaned up quickly.
[
let pbi = soc>>BSPSoc.oPBI
if pbi ne 0 then
[
let nBytes = pbi>>PBI.pup.length - soc>>BSPSoc.oByteC
pbi>>PBI.pup.length = nBytes+pupOvBytes
SetPupID(pbi, lv soc>>BSPSoc.xmitByteID)
DoubleIncrement(lv soc>>BSPSoc.xmitByteID, nBytes)
soc>>BSPSoc.unAckedPups = soc>>BSPSoc.unAckedPups+1
soc>>BSPSoc.unAckedBytes = soc>>BSPSoc.unAckedBytes+nBytes
soc>>BSPSoc.pupAlloc = soc>>BSPSoc.pupAlloc-1
soc>>BSPSoc.byteAlloc = soc>>BSPSoc.byteAlloc-nBytes
soc>>BSPSoc.oPBI = 0
soc>>BSPSoc.oByteC = 0
soc>>BSPSoc.puts = BSPPutByte
pbi>>PBI.queue = lv soc>>BSPSoc.bspTQ //return pbi to bspTQ
pbi>>PBI.timer = soc>>BSPSoc.aDataCount
TransmitStrategy(pbi, na gr 1 & sendNow)
CompletePup(pbi)
SetTimeout(soc)
]
resultis true // for BSPPuts
]
//----------------------------------------------------------------------------
and BSPGetMark(soc) = valof
//----------------------------------------------------------------------------
[
unless soc>>BSPSoc.markPending resultis Errors(soc, ecBadBSPGetMark)
soc>>BSPSoc.markPending = false
resultis Gets(soc+offsetBSPStr)
]
//----------------------------------------------------------------------------
and BSPPutMark(soc, markByte, timeout, sendNow; numargs na) = valof
//----------------------------------------------------------------------------
[
// This is hairy. We want to put the markByte in a fresh PBI. Just calling
// BSPForceOutput followed by Puts is not good enough, because Puts calls
// BSPPrepareOPBI which may Block awaiting allocation, and meanwhile some
// other context might do a Puts of some data bytes, which our Puts would
// then append to.
[ // repeat
BSPForceOutput(soc)
let ec = BSPPrepareOPBI(soc, (na ge 3? timeout, -1))
if ec ne 0 resultis Errors(soc+offsetBSPStr, ec)
// this is how we tell whether we now have a fresh PBI:
] repeatuntil soc>>BSPSoc.oByteC eq soc>>BSPSoc.oPBI>>PBI.pup.length
soc>>BSPSoc.oPBI>>PBI.pup.type = typeMark
Puts(soc+offsetBSPStr, markByte) // guaranteed to succeed
BSPForceOutput(soc, na gr 3 & sendNow)
resultis true
]