Source code for jps.security

import os

import zmq
import zmq.auth
from zmq.auth.thread import ThreadAuthenticator


def load_and_set_key(zmq_socket, key_path):
    public, secret = zmq.auth.load_certificate(key_path)
    zmq_socket.curve_secretkey = secret
    zmq_socket.curve_publickey = public


[docs]class Authenticator(object): _authenticators = {} @classmethod
[docs] def instance(cls, public_keys_dir): '''Please avoid create multi instance''' if public_keys_dir in cls._authenticators: return cls._authenticators[public_keys_dir] new_instance = cls(public_keys_dir) cls._authenticators[public_keys_dir] = new_instance return new_instance
def __init__(self, public_keys_dir): self._auth = ThreadAuthenticator(zmq.Context.instance()) self._auth.start() self._auth.allow('*') self._auth.configure_curve(domain='*', location=public_keys_dir)
[docs] def set_server_key(self, zmq_socket, server_secret_key_path): '''must call before bind''' load_and_set_key(zmq_socket, server_secret_key_path) zmq_socket.curve_server = True
[docs] def set_client_key(self, zmq_socket, client_secret_key_path, server_public_key_path): '''must call before bind''' load_and_set_key(zmq_socket, client_secret_key_path) server_public, _ = zmq.auth.load_certificate(server_public_key_path) zmq_socket.curve_serverkey = server_public
[docs] def stop(self): self._auth.stop()
[docs]def create_certificates(keys_dir='certificates'): if not os.path.exists(keys_dir): os.mkdir(keys_dir) server_public_file, server_secret_file = zmq.auth.create_certificates( keys_dir, "server") client_public_file, client_secret_file = zmq.auth.create_certificates( keys_dir, "client")
if __name__ == '__main__': keys_dir = 'certificates' import sys if len(sys.argv) > 1: keys_dir = sys.argv[1] create_certificates(keys_dir)