cligen/procpool

This module provides a facility like Python's multiprocessing module but is less automagic & little error handling is done. MSlice is used as a reply type to avoid copy in case replies are large. Auto-pack/unpack logic could mimic Python's for x in p.imap_unordered more closely. While only at Proof Of Concept stage, the example (frames|eval)(0term|LenPfx) programs work ok.

In more detail, this module abstracts the orchestration activity of a parent striping work over a pool of classic kid coprocesses { i.e. a double-pipe to each kid; How classic? Ksh88 had coproc. } Kids are user code here & are in-out filters needing only request-reply framing. The parent loops over work generation passed in initProcPool, writes request frames to each kid, reads & frames whatever replies until all work & replies are done. The core of this is ~80 lines of code. This set up is least awkward when inputs & outputs are both small & an easy message protocol exists (like examples/).

Concretely this snippet computes & formats line lengths in parallel kids,

import cligen/[procpool, mslice, osUt], os
proc work(r, w: cint) =
  let o = w.open(fmWrite, IOLBF)
  for s in r.open(fmRead, IOLBF).lines: o.write $s.len & "\n"
var pp = initProcPool(work, framesLines)
pp.evalLines commandLineParams(), echo

Instead of *Lines, you may like *0term as examples/only.nim, *LenPfx like examples/grl.nim, or *Ob like examples/gl.nim. Interleaved output is not a problem since reply processing (prn above) is all in the parent. There is no need for locking (unless work sets up & uses shared resources).

Many prog.lang people seem unaware that processes & threads are distinguished mostly by safe vs. unsafe defaults { see Linux clone after Plan9 rfork }. Memory can be opt-in-shared via memfiles. RAM files can avoid device IO & copying (other than short paths) to reduce differences to the awkwardness of pointers becoming relative to named files. Opt-into-risk designs are always "more work on-purpose" for client code. Additional benefits are bought here: seamless persistent data, well separated resource limits/state/etc. YMMV, but pre-spawned proc pools switch about as fast. Procs can be slower to build sharing BUT faster from less contention {eg. if kids do mmap IO, thread-sibs contend for VM edits in fast (un)map cycles, but proc-sibs have uncontended, private VM}. One case's "awkward" is another's "expresses vital ideas". Good ecosystems should have libs for both (& also for files as named arenas).

Types

Filter = object
  fd0*, fd1*: cint           ## PARENT VIEW of request|input & reply|output file handles
  buf*: string               ## current read buffer
  off*: int                  ## byte offset into `buf` to read new data into
  done*: bool                ## flag indicating completion
  aux*: int                  ## general purpose int-sized client data, e.g. obsz
Abstract coprocess filter read|writing req|rep fd's
Frames = proc (f: var Filter): iterator (): MSlice
ProcPool = object
A process pool to do work on multiple cores

Procs

proc close(pp: ProcPool; kid: int) {....raises: [], tags: [], forbids: [].}
proc frames0(f: var Filter): (iterator (): MSlice) {.
    ...deprecated: "use `frames0term`", raises: [], tags: [], forbids: [].}
Deprecated: use `frames0term`
proc frames0term(f: var Filter): iterator (): MSlice {....raises: [], tags: [],
    forbids: [].}
A reply frames iterator for workers writing '0'-terminated results.
proc framesLenPfx(f: var Filter): iterator (): MSlice {....raises: [], tags: [],
    forbids: [].}
A reply frames iterator for wrk procs writing [int, value] results.
proc framesLines(f: var Filter): iterator (): MSlice {....raises: [], tags: [],
    forbids: [].}
A reply frames iterator for workers writing 'n'-terminated results.
proc framesOb(f: var Filter): iterator (): MSlice {....raises: [], tags: [],
    forbids: [].}
A reply frames iterator for wrk procs writing fixed-size binary objects.
proc initProcPool(work: proc (r, w: cint); frames: Frames; jobs = 0; aux = 0;
                  toR = to0; toW = to0; raiseCtrlC = false; bufSz = 8192): ProcPool {.
    noinit, ...raises: [Exception, OSError], tags: [WriteIOEffect, RootEffect],
    forbids: [].}
proc len(pp: ProcPool): int {.inline, ...raises: [], tags: [], forbids: [].}
proc noop(s: MSlice) {....raises: [], tags: [], forbids: [].}
convenience no-op for eval*.

Iterators

iterator finalReplies(pp: var ProcPool): MSlice {....raises: [Exception],
    tags: [RootEffect], forbids: [].}
iterator readyReplies(pp: var ProcPool): MSlice {....raises: [Exception],
    tags: [RootEffect], forbids: [].}

Templates

template eval(pp, reqGen, onReply: untyped) {....deprecated: "use `eval0term`".}
Deprecated: use `eval0term`
template eval0term(pp, reqGen, onReply)
template evalLenPfx(pp, reqGen, onReply)
template evalLines(pp, reqGen, onReply)
template evalOb(pp, reqGen, onReply)
template evalp(pp, reqGen, onReply: untyped) {....deprecated: "use `evalLenPfx`".}
Deprecated: use `evalLenPfx`
template wrReq(fds, i0, pp, wr, rq): untyped
Internal to eval*; Use those. Evaluates to true if request was written.