Source code for jps.service

import zmq
from zmq.utils.strtypes import cast_bytes
import threading
from .common import DEFAULT_RES_PORT
from .common import DEFAULT_REQ_PORT
from .env import get_master_host


[docs]class ServiceServer(object): ''' Example: >>> def callback(req): ... return 'req = {req}'.format(req=req) ... >>> service = jps.ServiceServer(callback) >>> service.spin() ''' def __init__(self, callback, host=None, res_port=DEFAULT_RES_PORT): if host is None: host = get_master_host() context = zmq.Context() self._socket = context.socket(zmq.REP) self._socket.connect( 'tcp://{host}:{port}'.format(host=host, port=res_port)) self._callback = callback self._thread = None
[docs] def spin(self, use_thread=False): '''call callback for all data forever (until \C-c) :param use_thread: use thread for spin (do not block) ''' if use_thread: if self._thread is not None: raise 'spin called twice' self._thread = threading.Thread(target=self._spin_internal) self._thread.setDaemon(True) self._thread.start() else: self._spin_internal()
def _spin_internal(self): while True: self.spin_once()
[docs] def spin_once(self): request = self._socket.recv() self._socket.send(cast_bytes(self._callback(request)))
[docs]class ServiceClient(object): def __init__(self, host=None, req_port=DEFAULT_REQ_PORT): if host is None: host = get_master_host() context = zmq.Context() self._socket = context.socket(zmq.REQ) self._socket.connect( 'tcp://{host}:{port}'.format(host=host, port=req_port))
[docs] def call(self, request): self._socket.send(request) return self._socket.recv()
__call__ = call