import json
import threading
import time
from .common import Error
from .publisher import Publisher
from .subscriber import Subscriber
from .service import ServiceServer
from .service import ServiceClient
class UploadSingleTopicBridge(object):
def __init__(self, topic, remote_host=None, remote_pub_port=None):
self._local_subscriber = Subscriber(topic, self.local_to_remote)
self._remote_publisher = Publisher(topic, host=remote_host,
pub_port=remote_pub_port)
def spin(self):
self._local_subscriber.spin(use_thread=True)
def local_to_remote(self, msg):
self._remote_publisher.publish(msg)
class DownloadSingleTopicBridge(object):
def __init__(self, topic, remote_host=None, remote_sub_port=None):
self._local_publisher = Publisher(topic)
self._remote_subscriber = Subscriber(topic, self.remote_to_local,
host=remote_host,
sub_port=remote_sub_port)
def spin(self):
self._remote_subscriber.spin(use_thread=True)
def remote_to_local(self, msg):
self._local_publisher.publish(msg)
[docs]class Bridge(object):
def __init__(
self, upload_topic_names, download_topic_names, remote_host=None,
remote_pub_port=None, remote_sub_port=None):
'''
Pub/Sub in different jps network
upload_topic_names and download_topic_names should not
contain same names. It causes infinity loop.
'''
if len(set(set(upload_topic_names) & set(download_topic_names))) > 0:
raise Error('upload_topic_names and download_topic_names should not' +
'contain same names')
self._bridges = []
for topic in upload_topic_names:
self._bridges.append(UploadSingleTopicBridge(
topic, remote_host, remote_pub_port=remote_pub_port))
for topic in download_topic_names:
self._bridges.append(DownloadSingleTopicBridge(
topic, remote_host, remote_sub_port=remote_sub_port))
[docs] def spin(self):
for b in self._bridges:
b.spin()
class BridgeServiceBase(object):
PAYLOAD_KEY = 'bridge_topics'
def __init__(self, topics, sub_port=None, pub_port=None):
'''convert subscribed topic to service call and publish the return value as topic'''
self._received_data = {}
self._subscribers = {}
self._publishers = {}
self._pub_port = pub_port
for topic in topics:
self._subscribers[topic] = Subscriber(
topic, self.callback_with_name, sub_port=sub_port)
self._subscribers[topic].spin(use_thread=True)
def callback_with_name(self, msg, topic_name):
self._received_data[topic_name] = msg
def publish_topic_data(self, topic_data):
for topic, data in topic_data.iteritems():
if topic not in self._publishers:
self._publishers[topic] = Publisher(
topic, pub_port=self._pub_port)
time.sleep(0.1)
self._publishers[topic].publish(data)
[docs]class BridgeServiceClient(BridgeServiceBase):
def __init__(self, upload_topics, frequency=10.0, sub_port=None, pub_port=None,
host=None, req_port=None, use_security=False):
BridgeServiceBase.__init__(
self, upload_topics, sub_port=sub_port, pub_port=pub_port)
self._service_client = ServiceClient(
host=host, req_port=req_port, use_security=use_security)
self._thread = None
self._frequency = frequency
[docs] def spin(self, use_thread=False):
if use_thread:
if self._thread is not None:
raise 'spin called twice'
self._thread = threading.Thread(target=self._spin_internal)
self._thread.setDaemon(True)
self._thread.start()
else:
self._spin_internal()
def _spin_internal(self):
sleep_sec = 1.0 / self._frequency
while True:
self.spin_once()
time.sleep(sleep_sec)
[docs] def spin_once(self):
ret = self._service_client(json.dumps(
{BridgeServiceBase.PAYLOAD_KEY: self._received_data}))
topic_data = json.loads(ret)
# {"led": {"r": 5, "g": 10, "b": 255}, ...}
self.publish_topic_data(topic_data)
[docs]class BridgeServiceServer(BridgeServiceBase):
def __init__(self, download_topics, sub_port=None, pub_port=None,
res_port=None, use_security=False):
BridgeServiceBase.__init__(
self, download_topics, sub_port=sub_port, pub_port=pub_port)
self._service_server = ServiceServer(
self.callback, res_port=res_port, use_security=use_security)
[docs] def callback(self, request):
req_dict = json.loads(request)
if BridgeServiceBase.PAYLOAD_KEY not in req_dict:
return ''
self.publish_topic_data(req_dict[BridgeServiceBase.PAYLOAD_KEY])
return json.dumps(self._received_data)
[docs] def spin(self, use_thread=False):
self._service_server.spin(use_thread=use_thread)
[docs] def close(self):
self._service_server.close()