// GrapevineLocate.bcpl // Copyright Xerox Corporation 1981, 1982, 1983 // Last modified October 21, 1983 3:49 PM by Taft get "Pup0.decl" get "Pup1.decl" get "Grapevine.decl" get "GrapevineProtocol.decl" get "GrapevineInternal.decl" external [ // outgoing procedures FindServer // incoming procedures ReadRList; DestroyRList; ReadRString; GVCreateStream OpenLevel1Socket; CloseLevel1Socket; CompletePup GetPBI; ReleasePBI; SetPupDPort; LocateNet EnumeratePupAddresses InitializeContext; Enqueue; Dequeue; Unqueue; QueueLength StringCompare Block; Dismiss; SetTimer; TimerHasExpired Allocate; Free; MoveBlock; Zero // incoming statics @gus; pupCtxQ; ndbQ ] structure ARec: // ATable record for R-Server probing [ requestPort @Port // port to prod hops word // distance from here; zero if preSorted // The replyPort doesn't have anything to do with the corresponding requestPort. // They are kept in the same data structure in order to avoid having two separate // tables. (The number of potential replies is essentially the same as // the number of requests.) replyPort @Port ] manifest lenARec = size ARec/16 structure ATable: // Address table for R-Server probing [ count word // number of ARecs maxCount word // maximum number of ARecs that will fit in ATable preSorted word // true iff ARecs are already sorted by hop count nReplies word // number of replies received rec^0,0 @ARec ] manifest lenATableHeader = offset ATable.rec/16 //---------------------------------------------------------------------------- let FindServer(serviceName, pollingSocket, proc, arg) = valof //---------------------------------------------------------------------------- // Attempts to find an instance of the service with the specified name. // If serviceName contains a ".", treats it as an RName (which must be // a Grapevine group) and attempts to locate the nearest functioning instance // of the service among the group's members (which must be individuals // whose connect sites are valid Pup address constants). // If serviceName does not contain a ".", treats it as an NLS name and // attempts to locate the nearest functioning instance of the service without // consulting Grapevine (a local broadcast is also issued). // In either case, for each potential instance of the service, calls proc(port, arg), // which should attempt to open a connection to the given port and return true // if successful and false if not (arg may be used to communicate additional // parameters and/or results). Proc should at least default and perhaps // unconditionally set the port's socket field to the appropriate well-known // socket number before using it, since in general the service socket is distinct // from the polling socket. Note that proc will be called repeatedly until either // it returns true or the list of potential instances is exhausted. // If FindServer is successful, it returns zero; if unsuccessful, it returns one of // ecBadRName (there is no such service) or ecAllDown (can't contact any instance of // the service). // The service to be located must respond to EchoMe requests on a well-known // socket, the low 16 bits of which are given as pollingSocket and the // high 16 bits of which are gus>>GUS.socketHigh (zero except when testing). Any // service (Grapevine or non-Grapevine) obeying this convention may be located, // regardless of whether it is named by an RName or an NLS name. // The caller is assumed NOT to have done a GVClaimStream(), since FindServer may // call the Grapevine package recursively to read group memberships and such. // If proc manipulates the stream then it must call GVClaimStream internally. [ let zone = gus>>GUS.zone let ec = ecAllDown // see what kind of name, and set up some defaults if StringCompare(serviceName, "GV.GV") eq 0 then // prevent infinite recursion in the Grapevine name space. (This should // not actually happen, but this test is here anyway for safety.) serviceName = "GrapevineRServer" let haveRName = IsRName(serviceName) // FindServer (cont'd) // obtain table of connect ports for service let aTable = 0 test haveRName ifnot [ // Obtain list of addresses by looking up serviceName as an NLS-name, // and also by broadcasting. manifest maxAddrs = 10 // we'll take the 10 closest manifest lenATable = lenATableHeader + maxAddrs*lenARec aTable = Allocate(zone, lenATable); Zero(aTable, lenATable) aTable>>ATable.maxCount = maxAddrs // First request will be a local broadcast. // Remaining requests will be to ports registered in NLS. // Note that EnumeratePupAddresses gives them closest first. aTable>>ATable.preSorted = true AccumulateGVPort(table [ 0; 0; 0 ], aTable) EnumeratePupAddresses(serviceName, AccumulateGVPort, aTable) ] ifso [ // obtain the list by enumerating serviceName as a Grapevine group // and then obtaining the connect site for each member. let rList = ReadRList(serviceName, opReadMembers, lv ec) if rList ne 0 then [ let count = QueueLength(lv rList>>RList.queue) let lenATable = lenATableHeader + count*lenARec aTable = Allocate(zone, lenATable); Zero(aTable, lenATable) aTable>>ATable.maxCount = count let rItem = rList>>RList.queue.head while rItem ne 0 do [ // get connect site string for member and convert to address let member = lv rItem>>RItem.rName let memberIsRName = IsRName(member) let connect = memberIsRName? ReadRString(lv rItem>>RItem.rName, opReadConnect), member if connect ne 0 then [ // suppress usual reachability check, since we will do our own // computation based on hop counts later on. let port = vec lenPort if EnumeratePupAddresses(connect, 0, port, true) eq 0 then AccumulateGVPort(port, aTable, nil) if memberIsRName then Free(zone, connect) ] rItem = rItem>>RItem.next ] DestroyRList(rList) ] ] if aTable eq 0 resultis ec // ecBadRName or ecAllDown // FindServer (cont'd) // Now things start to get tricky... // Send Echo packets to each socket in aTable, in order of hop count so as // to bias the choice to nearby servers. As replies come back from each // server, attempt to establish a byte stream to that server; and terminate // the polling process as soon as this is successful. // This is done by three processes in order to prevent deadlocks due to // hogging PBIs. The Prodder sends out EchoMe Pups to the addresses in // the table. The Slurper receives the IAmEcho responses, and marks the // table. Meanwhile, the original process reads the table and attempts to // establish connections to the marked addresses. let soc = vec lenPupSoc OpenLevel1Socket(soc, 0, 0, true) // transient default local socket let nRoutesFound = 0 for try = 1 to 4 do // try the whole process up to 4 times [ unless nRoutesFound eq aTable>>ATable.count % aTable>>ATable.preSorted do // try to get all the hop counts. Note: hop counts were initialized // to zero if preSorted, maxHops+1 otherwise. for routeTry = 0 to 10 do [ let dontProbe = (routeTry & 1) eq 0 for i = 0 to aTable>>ATable.count-1 do [ let aRec = lv aTable>>ATable.rec^i if aRec>>ARec.hops gr maxHops then [ let rte = LocateNet(aRec>>ARec.requestPort.net, dontProbe) if rte ne 0 then [ aRec>>ARec.hops = rte>>RTE.hops if aRec>>ARec.hops le maxHops then nRoutesFound = nRoutesFound+1 ] ] ] if nRoutesFound eq aTable>>ATable.count % nRoutesFound gr 0 & routeTry ge 2 then break unless dontProbe do Dismiss(100) ] aTable>>ATable.nReplies = 0 // no replies yet let nextReplyToExamine = 0 // create Prodder & Slurper processes let ctxTable = vec 1 for i = 0 to 1 do [ let ctx = InitializeContext(Allocate(zone, 150), 150, (i eq 0? Prodder, Slurper), 3) ctx!3 = aTable; ctx!4 = soc // args needed by Prodder & Slurper ctx!5 = pollingSocket // arg needed by Prodder only Enqueue(pupCtxQ, ctx) ctxTable!i = ctx ] // FindServer (cont'd) // collect responses produced by Prodder & Slurper [ // repeat // wait for reply to be returned by Slurper let timer = nil; SetTimer(lv timer, 150) // 1.5 seconds Dismiss(1) repeatuntil aTable>>ATable.nReplies gr nextReplyToExamine % TimerHasExpired(lv timer) if aTable>>ATable.nReplies eq nextReplyToExamine then break // timed out // have a candidate server; try to access it. let port = lv aTable>>ATable.rec^nextReplyToExamine.replyPort nextReplyToExamine = nextReplyToExamine+1 if proc(port, arg) then [ ec = 0; break ] // successfully accessed service ] repeat // destroy Prodder & Slurper processes for i = 0 to 1 do [ Unqueue(pupCtxQ, ctxTable!i); Free(zone, ctxTable!i) ] if ec eq 0 then break // successfully opened stream ] CloseLevel1Socket(soc) Free(zone, aTable) resultis ec // zero (successful) or ecAllDown ] //---------------------------------------------------------------------------- and AccumulateGVPort(port, aTable, nil; numargs na) = valof //---------------------------------------------------------------------------- // 2-argument call is from NLS expansion: don't accumulate specific addresses on // the directly-connected network, since they will be reached by the initial broadcast. // 3-argument call is from R-Name expansion: set hops field to maxHops+1. [ let count = aTable>>ATable.count if count eq aTable>>ATable.maxCount resultis true let aRec = lv aTable>>ATable.rec^count test na eq 2 ifso if port>>Port.net eq (ndbQ!0)>>NDB.localNet & port>>Port.host ne 0 then resultis false ifnot aRec>>ARec.hops = maxHops+1 MoveBlock(lv aRec>>ARec.requestPort, port, lenPort) aTable>>ATable.count = count+1 resultis false ] //---------------------------------------------------------------------------- and IsRName(string) = valof //---------------------------------------------------------------------------- // Returns true iff string contains a period. [ for i = 1 to string>>String.length do if string>>String.char^i eq $. then resultis true resultis false ] // Prodder and Slurper need not be OEPs, because the processes that execute // them exist only during the lifetime of FindServer, which is in the // same module as these. //---------------------------------------------------------------------------- and Prodder(ctx) be //---------------------------------------------------------------------------- [ let aTable = ctx!3 let soc = ctx!4 let pollingSocket = ctx!5 manifest lastHops = 3 // treat >3 hops as equivalent to 3 for hops = 0 to lastHops do for i = 0 to aTable>>ATable.count-1 do [ let aRec = lv aTable>>ATable.rec^i if aRec>>ARec.hops eq hops % (hops eq lastHops & aRec>>ARec.hops gr hops & aRec>>ARec.hops le maxHops) then [ let pbi = GetPBI(soc) SetPupDPort(pbi, lv aRec>>ARec.requestPort) pbi>>PBI.pup.dPort.socket^1 = gus>>GUS.socketHigh pbi>>PBI.pup.dPort.socket^2 = pollingSocket CompletePup(pbi, ptEchoMe, pupOvBytes) // space requests 100 ms apart so as to favor earlier // requests and avoid hogging PBIs. Dismiss(10) ] ] Block() repeat ] //---------------------------------------------------------------------------- and Slurper(ctx) be //---------------------------------------------------------------------------- [ let aTable = ctx!3 let soc = ctx!4 [ // repeat Block() repeatuntil soc>>PupSoc.iQ.head ne 0 let pbi = Dequeue(lv soc>>PupSoc.iQ) if pbi>>PBI.pup.type eq ptImAnEcho then [ let i = aTable>>ATable.nReplies if i uls aTable>>ATable.count then [ let aRec = lv aTable>>ATable.rec^i MoveBlock(lv aRec>>ARec.replyPort, lv pbi>>PBI.pup.sPort, lenPort) aTable>>ATable.nReplies = i+1 ] ] ReleasePBI(pbi) ] repeat ]