-- Copyright (C) 1981, 1982, 1984, 1985  by Xerox Corporation. All rights reserved. -- ReadInput.mesa, Transport Mechanism Mail Server - algorithm to process input queue ---- HGM, 17-Oct-85 21:58:07-- Randy Gobbel		19-May-81 23:20:09 ---- Andrew Birrell	30-Dec-81 15:25:29 ---- Mark Johnson 	 7-Jan-82 15:28:38 ---- Ted Wobber	 	 2-Nov-82 11:08:23-- Brenda Hankins 	15-Aug-84 16:19:11DIRECTORY  BodyDefs USING [maxRNameLength, PackedTime, RName, Timestamp],  Buffer USING [AccessHandle, GetBuffer, MakePool],  Heap USING [systemZone],  HeapDefs USING [    HeapReadData, HeapReadRName, HeapEndRead, HeapStartWrite, HeapWriteData,    HeapWriteRName, ObjectNumber, ReaderHandle, ReadRList, WriterHandle],  Inline USING [LongCOPY],  LogDefs USING [DisplayNumber, WriteChar, WriteLogEntry],  LogPrivateDefs USING [startUpTime],  MailboxDefs USING [MBXWrite, Remail, WaitForUnlocked],  NameInfoDefs USING [Close, Enumerate],  PolicyDefs USING [    CheckOperation, EndOperation, Operation, ReadPendingPause, Wait, WaitOperation],  PupDefs USING [    PupAddress, PupBuffer, PupRouterSendThis, SetPupContentsWords,    UniqueLocalPupAddress],  Process USING [Detach, DisableTimeout, Pause],  ProtocolDefs USING [AppendTimestamp, mailServerPollingSocket],  RestartDefs USING [] --EXPORT -- ,  ReturnDefs USING [BadGroup, BadRecipients, LongTerm],  ServerDefs USING [EnumerateAll, ServerHandle],  SiteCacheDefs USING [    FindMBXSite, NeedToRemail, RecipientInfo, RemailInfo, SingleFlush],  SLDefs USING [    GetCount, SLHeader, SLStartRead, SLEndRead, SLWrite, SLQueue, SLReadHandle,    SLTransfer, WaitForNonEmpty],  String USING [AppendChar, AppendDecimal, AppendString, AppendLongDecimal, EquivalentStrings],  Time USING [Current, Packed, Unpack];ReadInput: MONITOR  IMPORTS    Buffer, Heap, HeapDefs, LogDefs, LogPrivateDefs, Inline, MailboxDefs, NameInfoDefs,    PolicyDefs, PupDefs, Process, ProtocolDefs, ReturnDefs, ServerDefs,    SiteCacheDefs, SLDefs, String, Time  EXPORTS MailboxDefs, RestartDefs =  BEGIN  BadSL: ERROR = CODE;    -- statistics --  messageTotal, localTotal, remoteTotal, badTotal: LONG CARDINAL ¬ 0;  foreignTotal, dlTotal, skippedTotal: LONG CARDINAL ¬ 0;  recipientLog: LONG STRING ¬ Heap.systemZone.NEW[StringBody[150]];  -- about 10 recipients --  NoteRecipient: PROC [name: BodyDefs.RName] =    BEGIN    dot: STRING = "..."L;    IF recipientLog.length + 1 + name.length + 1 + dot.length < recipientLog.maxlength THEN      BEGIN      String.AppendChar[recipientLog, ' ];      String.AppendString[recipientLog, name];      END    ELSE      IF recipientLog.length + 1 + dot.length < recipientLog.maxlength THEN        BEGIN        String.AppendChar[recipientLog, ' ];        String.AppendString[recipientLog, dot];        END;    END;  -- time-outs --  shortTermLimit: CARDINAL = 2 --days-- ;  longTermLimit: CARDINAL = 4 --days-- ;  timeoutDisabled: BOOLEAN ¬ TRUE;  bufferPool: Buffer.AccessHandle;  flushSource: PupDefs.PupAddress = PupDefs.UniqueLocalPupAddress[NIL];  TellOtherServer: PROC [received: BodyDefs.Timestamp, name: BodyDefs.RName] =    BEGIN    -- non-local name, tell the M-Server that gave us the msg to flush it's cache    IF received.net # 0 AND received.host # 0 THEN      BEGIN      b: PupDefs.PupBuffer = Buffer.GetBuffer[        type: pup, aH: bufferPool, function: send];      b.pup.dest ¬ [        net: [received.net], host: [received.host],        socket: ProtocolDefs.mailServerPollingSocket];      b.pup.source ¬ flushSource;      Inline.LongCOPY[        from: name, to: @(b.pup.pupWords),        nwords: SIZE[StringBody [name.length]]];      PupDefs.SetPupContentsWords[b, SIZE[StringBody [name.length]]];      b.pup.pupType ¬ LOOPHOLE[215B];      PupDefs.PupRouterSendThis[b];      END    -- ELSE message came to us from a client -- ;    END;  Sort: PROCEDURE [    handle: SLDefs.SLReadHandle, body: HeapDefs.ObjectNumber,    SL: HeapDefs.ReaderHandle] =    BEGIN    -- Despite its name, this procedure has no sorting algorithm as such,    -- since the delivery sites for a message are determined in a single    -- pass through its steering list    pendingSL: HeapDefs.WriterHandle ¬ NIL;    lockedSL: HeapDefs.WriterHandle ¬ NIL;    badList: HeapDefs.WriterHandle ¬ NIL;    slHeader: SLDefs.SLHeader;    localCount, remoteCount, pendingCount, badCount, lockCount: CARDINAL ¬ 0;    foreignCount, dlCount, skippedCount: CARDINAL ¬ 0;    MakeBad: PROC [recipient: BodyDefs.RName] =      BEGIN      IF badList = NIL THEN badList ¬ HeapDefs.HeapStartWrite[temp];      HeapDefs.HeapWriteRName[badList, recipient];      END;    MakeLocked: PROC [recipient: BodyDefs.RName] =      BEGIN      IF lockedSL = NIL THEN        BEGIN        lockedSL ¬ HeapDefs.HeapStartWrite[SLpending];        slHeader.server ¬ NIL;        HeapDefs.HeapWriteData[lockedSL, [@slHeader, SIZE[SLDefs.SLHeader]]];        END;      HeapDefs.HeapWriteRName[lockedSL, recipient];      lockCount ¬ lockCount + 1;      END;    MakePending: PROC [recipient: BodyDefs.RName] =      BEGIN      IF pendingSL = NIL THEN        BEGIN        pendingSL ¬ HeapDefs.HeapStartWrite[SLpending];        slHeader.server ¬ NIL;        HeapDefs.HeapWriteData[pendingSL, [@slHeader, SIZE[SLDefs.SLHeader]]];        END;      HeapDefs.HeapWriteRName[pendingSL, recipient];      pendingCount ¬ pendingCount + 1;      END;    HaveIDoneThisOne: SIGNAL [new: BodyDefs.RName] RETURNS [BOOLEAN] = CODE;    -- This signal is used to prevent recursive loops when expanding    -- recursively defined DL's. --    Deliver: PROCEDURE [recipient: BodyDefs.RName] RETURNS [done: BOOLEAN] =      BEGIN  -- deliver to one recipient, who may be a group --      DeliverLocal: ENTRY PROC RETURNS [info: SiteCacheDefs.RecipientInfo] =        BEGIN  -- part of the cache-flush synchronization arrangements --        info ¬          IF String.EquivalentStrings[recipient, "ExpressMail­.ms"] THEN [          notFound[]] ELSE SiteCacheDefs.FindMBXSite[recipient];        WITH info SELECT FROM          local =>            IF MailboxDefs.MBXWrite[recipient, body, slHeader.created] THEN {              LogDefs.WriteChar['L]; localCount ¬ localCount + 1}            ELSE {LogDefs.WriteChar['M]; MakeLocked[recipient]};          allDown, notFound => NULL;          ENDCASE => TellOtherServer[slHeader.received, recipient];        END;      info: SiteCacheDefs.RecipientInfo = DeliverLocal[];      done ¬ FALSE;      WITH info SELECT FROM        dl =>          BEGIN          oldBadList: HeapDefs.WriterHandle = badList;          badList ¬ NIL;          LogDefs.WriteChar['G];	  dlCount ¬ dlCount + 1;          IF ~(SIGNAL HaveIDoneThisOne[recipient]) THEN	    BEGIN            NameInfoDefs.Enumerate[              members, Deliver !              HaveIDoneThisOne =>                IF String.EquivalentStrings[recipient, new] THEN RESUME [TRUE]];	    END	  ELSE skippedCount ¬ skippedCount + 1;          IF badList # NIL THEN ReturnDefs.BadGroup[badList, body, recipient];          badList ¬ oldBadList;          NameInfoDefs.Close[members];          END;        foreign =>          BEGIN          LogDefs.WriteChar['X];          IF ~(SIGNAL HaveIDoneThisOne[recipient]) THEN	    BEGIN            HeapDefs.ReadRList[              members, Deliver !              HaveIDoneThisOne =>                IF String.EquivalentStrings[recipient, new] THEN RESUME [TRUE]];	    END	  ELSE skippedCount ¬ skippedCount + 1;          HeapDefs.HeapEndRead[members];          foreignCount ¬ foreignCount + 1;          END;        local => NULL -- done by DeliverLocal -- ;        found =>          BEGIN          LogDefs.WriteChar['F];          IF server.SL = NIL THEN            BEGIN            server.SL ¬ HeapDefs.HeapStartWrite[SLforward];            slHeader.server ¬ server;            HeapDefs.HeapWriteData[server.SL, [@slHeader, SIZE[SLDefs.SLHeader]]];            END;          HeapDefs.HeapWriteRName[server.SL, recipient];          remoteCount ¬ remoteCount + 1;          END;        allDown => {LogDefs.WriteChar['P]; MakePending[recipient]};        notFound =>          BEGIN          LogDefs.WriteChar['B];          MakeBad[recipient];          badCount ¬ badCount + 1;          END;        ENDCASE => ERROR;      -- The loop calling this procedure may use lots of processor time:      Process.Pause[1];      END;    BEGIN    startTime: Time.Packed ¬ Time.Current[];    reject: BOOLEAN;    pendingSL ¬ NIL;    badList ¬ NIL;    BEGIN    ended: BOOLEAN;    used: CARDINAL;    [ended, used] ¬ HeapDefs.HeapReadData[SL, [@slHeader, SIZE[SLDefs.SLHeader]]];    IF used # SIZE[SLDefs.SLHeader] THEN ERROR BadSL[];    reject ¬ ElapsedDays[LOOPHOLE[slHeader.created.time], longTermLimit]      OR ElapsedDays[LOOPHOLE[slHeader.received.time], shortTermLimit];    recipientLog.length ¬ 0;    String.AppendString[recipientLog, "RecipientLog "L];    ProtocolDefs.AppendTimestamp[recipientLog, slHeader.created];    String.AppendChar[recipientLog, ':];    UNTIL ended DO      recipient: BodyDefs.RName = [BodyDefs.maxRNameLength];      ended ¬ HeapDefs.HeapReadRName[SL, recipient];      NoteRecipient[recipient];      IF reject THEN MakeBad[recipient]      ELSE [] ¬ Deliver[recipient ! HaveIDoneThisOne => RESUME [FALSE]];      ENDLOOP;    END;    HeapDefs.HeapEndRead[SL];    IF reject AND badList # NIL THEN      BEGIN      log: LONG STRING ¬ Heap.systemZone.NEW[StringBody[128]];      ReturnDefs.LongTerm[badList, body];      String.AppendString[log, "Long-term timeout on "L];      ProtocolDefs.AppendTimestamp[log, slHeader.created];      LogDefs.WriteLogEntry[log];      Heap.systemZone.FREE[@log];      END    ELSE      BEGIN      EndServerSL: PROCEDURE [server: ServerDefs.ServerHandle] =        BEGIN  -- this is executed inside the ServerInfo monitor --        IF server.SL # NIL THEN          BEGIN          SLDefs.SLWrite[body, server.SL, forward];          server.SL ¬ NIL;          END;        END;      IF lockedSL # NIL THEN SLDefs.SLWrite[body, lockedSL, mailbox];      IF pendingSL # NIL THEN SLDefs.SLWrite[body, pendingSL, pending];      ServerDefs.EnumerateAll[EndServerSL];      IF badList # NIL THEN ReturnDefs.BadRecipients[badList, body];      IF pendingCount # 0 OR localCount # 0 OR remoteCount # 0 OR badCount # 0        OR lockCount # 0 THEN        BEGIN        log: LONG STRING ¬ Heap.systemZone.NEW[StringBody[200]];        called: BOOLEAN ¬ FALSE;        Add: PROC [count: CARDINAL, text: STRING] =          BEGIN          IF count # 0 THEN            BEGIN            String.AppendString[log, ", "L];            String.AppendDecimal[log, count];            String.AppendString[log, text];            called ¬ TRUE;            END;          END;        String.AppendString[log, "Delivered "L];        ProtocolDefs.AppendTimestamp[log, slHeader.created];        Add[localCount, " local"L];        Add[remoteCount, " remote"L];        Add[pendingCount, " pending"L];        Add[badCount, " bad"L];        Add[lockCount, " locked"L];        Add[foreignCount, " foreign"L];        Add[dlCount, " DLs"L];        Add[skippedCount, " DLs skipped"L];        IF ~called THEN String.AppendString[log, ", no recipients"L];        LogDefs.WriteLogEntry[log];        Heap.systemZone.FREE[@log];        LogDefs.WriteLogEntry[recipientLog];        messageTotal ¬ messageTotal + 1;        localTotal ¬ localTotal + localCount;        remoteTotal ¬ remoteTotal + remoteCount;        badTotal ¬ badTotal + badCount;        foreignTotal ¬ foreignTotal + foreignCount;        dlTotal ¬ dlTotal + dlCount;        skippedTotal ¬ skippedTotal + skippedCount;        END;      END;    SLDefs.SLEndRead[handle];    LogFinished[slHeader.created, startTime];    END;    END;  -- Sort  LogFinished: PROC [stamp: BodyDefs.Timestamp, startTime: Time.Packed] =    BEGIN    log: LONG STRING;    seconds: LONG CARDINAL ¬ Time.Current[]-startTime;    IF seconds < 30 THEN RETURN;    log ¬ Heap.systemZone.NEW[StringBody[128]];    String.AppendString[log, "Processing "L];    ProtocolDefs.AppendTimestamp[log, stamp];    String.AppendString[log, " took "L];    String.AppendLongDecimal[log, seconds];    String.AppendString[log, " seconds"L];    LogDefs.WriteLogEntry[log];    Heap.systemZone.FREE[@log];    END;  daySecs: LONG CARDINAL = 24 * LONG[60 * 60];  ElapsedDays: PROC [from: BodyDefs.PackedTime, interval: CARDINAL]    RETURNS [tooMany: BOOLEAN] =    -- Determines whether more than "interval" working days have elapsed    -- since "from".    BEGIN    -- For every five working days, we allow seven elapsed days --    -- For under five working days, must correct for weekends --    now: Time.Packed = Time.Current[];    completeWeeks: CARDINAL = (interval / 5) * 7;    partialWeek: CARDINAL = interval MOD 5;    limit: LONG CARDINAL ¬ from + (completeWeeks + partialWeek) * daySecs;    IF timeoutDisabled OR now < limit THEN RETURN[FALSE]  -- optimize most calls     ELSE      BEGIN      limitDay: CARDINAL = Time.Unpack[LOOPHOLE[limit, Time.Packed]].weekday;      saturday: CARDINAL = 5;      sunday: CARDINAL = 6;      SELECT limitDay FROM        saturday =>          SELECT partialWeek FROM            0 => NULL;  -- any time in the weekend is ok --            ENDCASE => limit ¬ limit + 2 * daySecs;        sunday =>          SELECT partialWeek FROM            0 => NULL;  -- any time in the weekend is ok --            1 =>  -- limit should be set to end of following Monday --              limit ¬ limit + 2 * daySecs;            ENDCASE => limit ¬ limit + 2 * daySecs;        ENDCASE =>          IF limitDay < partialWeek THEN            limit ¬ limit + MIN[partialWeek - limitDay, 2] * daySecs;      RETURN[now > limit]      END    END;  SeeIfTimeToStartTimeouts: PROCEDURE =    BEGIN  -- check uptime to see if greater than 6 hours.    timeoutDisabled ¬ Time.Current[] - LogPrivateDefs.startUpTime < 6 * 60 * 60    END;  forever: BOOLEAN ¬ FALSE;  SortQueue: PROC [queue: SLDefs.SLQueue] =    BEGIN    which: Class = SELECT queue FROM input => normal, ENDCASE => other;    operation: PolicyDefs.Operation =      SELECT queue FROM        input => readInput,        pending => readPending,        mailbox => readMailbox,        ENDCASE => ERROR;    DO      IF timeoutDisabled THEN SeeIfTimeToStartTimeouts[];      IF forever THEN SLDefs.WaitForNonEmpty[queue];      SELECT queue FROM        input => NULL;        pending => PolicyDefs.ReadPendingPause[];        mailbox => MailboxDefs.WaitForUnlocked[];        ENDCASE => ERROR;      ClaimSorter[which];      THROUGH [0..SLDefs.GetCount[queue]) DO        SL: HeapDefs.ReaderHandle;        handle: SLDefs.SLReadHandle;        body: HeapDefs.ObjectNumber;        IF queue = input OR ~PolicyDefs.CheckOperation[operation] THEN          BEGIN          ReleaseSorter[which];          IF forever THEN PolicyDefs.WaitOperation[operation]          ELSE {IF ~PolicyDefs.CheckOperation[operation] THEN RETURN};          ClaimSorter[which];          END;        [handle, body, SL] ¬ SLDefs.SLStartRead[queue];        Sort[handle, body, SL];        PolicyDefs.EndOperation[operation];        ENDLOOP;      ReleaseSorter[which];      IF ~forever THEN EXIT;      ENDLOOP;    END;  otherQueueCount: CARDINAL ¬ 0;  -- give priority to non input queues --  waiters: CARDINAL ¬ 0;  -- number of processes wanting the sorter --  sorterInUse: BOOLEAN ¬ FALSE;  -- mutual exclusion on calls of "Sort" --  sorterCond: CONDITION;  Class: TYPE = {normal, other};  -- class = other has priority --  ClaimSorter: ENTRY PROC [which: Class] =    BEGIN    waiters ¬ waiters + 1;    IF which = other THEN otherQueueCount ¬ otherQueueCount + 1;    WHILE sorterInUse OR (which # other AND otherQueueCount # 0) DO      WAIT sorterCond ENDLOOP;    sorterInUse ¬ TRUE;    waiters ¬ waiters - 1;    END;  ReleaseSorter: ENTRY PROC [which: Class] =    BEGIN    sorterInUse ¬ FALSE;    IF which = other THEN otherQueueCount ¬ otherQueueCount - 1;    BROADCAST sorterCond;    END;  USPS: PROC =    BEGIN    DO      used: CARDINAL;      handle: SLDefs.SLReadHandle;      body: HeapDefs.ObjectNumber;      SL: HeapDefs.ReaderHandle;      slHeader: SLDefs.SLHeader;      [handle, body, SL] ¬ SLDefs.SLStartRead[express];      [, used] ¬ HeapDefs.HeapReadData[SL, [@slHeader, SIZE[SLDefs.SLHeader]]];      IF used # SIZE[SLDefs.SLHeader] THEN ERROR BadSL[];      HeapDefs.HeapEndRead[SL];      UNTIL (Time.Current[]-slHeader.created.time) > 60*60 DO        -- Delay all Express mail at least an hour        PolicyDefs.Wait[mins: 1]; ENDLOOP;      PolicyDefs.WaitOperation[readExpress];      SLDefs.SLTransfer[handle, input];      PolicyDefs.EndOperation[readExpress];      LogExpressMotion[slHeader.created];      ENDLOOP;    END;  LogExpressMotion: PROC [stamp: BodyDefs.Timestamp] =    BEGIN    log: LONG STRING ¬ Heap.systemZone.NEW[StringBody[128]];    ProtocolDefs.AppendTimestamp[log, stamp];    String.AppendString[log, " moved from Express to Input Queue"L];    LogDefs.WriteLogEntry[log];    Heap.systemZone.FREE[@log];    END; -- The synchronization for sitecache flushing and possible remailing is  -- complicated.  The cause of the complication is that "Deliver" might  -- have found someone to be local but not placed the message in the  -- mailbox, at the time that we flush the cache.  This is prevented by  -- the entry procedures "DeliverLocal" and "FlushCache".  FlushCacheAndRemail: PUBLIC PROC [who: BodyDefs.RName] =    BEGIN    FlushCache: ENTRY PROC = {SiteCacheDefs.SingleFlush[who]};    info: SiteCacheDefs.RemailInfo;    FlushCache[];    info ¬ SiteCacheDefs.NeedToRemail[who];    IF info # stillLocal THEN MailboxDefs.Remail[who: who, valid: info = remail];    END;  ReadInput1: PUBLIC PROCEDURE =    BEGIN    Process.DisableTimeout[@sorterCond];    bufferPool ¬ Buffer.MakePool[send: 3, receive: 0];    LogDefs.DisplayNumber["Local recipients"L, [long[@localTotal]] ];    LogDefs.DisplayNumber["Remote recipients"L, [long[@remoteTotal]] ];    LogDefs.DisplayNumber["Bad recipients"L, [long[@badTotal]] ];    LogDefs.DisplayNumber["Foreign recipients"L, [long[@foreignTotal]] ];    LogDefs.DisplayNumber["DLs expanded"L, [long[@dlTotal]] ];    LogDefs.DisplayNumber["Nested DLs skipped"L, [long[@skippedTotal]] ];    END;  ReadInput2: PUBLIC PROCEDURE =    BEGIN    -- empty queues once synchronously, to get us off the ground    SortQueue[queue: mailbox];    SortQueue[queue: input];    forever ¬ TRUE;    Process.Detach[FORK SortQueue[queue: input]];    Process.Detach[FORK SortQueue[queue: mailbox]];    Process.Detach[FORK SortQueue[queue: pending]];    Process.Detach[FORK USPS[]];    END;  END.log:August 84 - blh:	Klamath update15-Aug-84 10:20:13 - blh:	disable timeouts for first 6 hours after startup.