Source code for jps.tools

import argparse
import datetime
import jps
import json
import os
import signal
import sys
import time


[docs]def pub(topic_name, json_msg, repeat_rate=None, host=jps.env.get_master_host(), pub_port=jps.DEFAULT_PUB_PORT): '''publishes the data to the topic :param topic_name: name of the topic :param json_msg: data to be published :param repeat_rate: if None, publishes once. if not None, it is used as [Hz]. ''' pub = jps.Publisher(topic_name, host=host, pub_port=pub_port) time.sleep(0.1) if repeat_rate is None: pub.publish(json_msg) else: try: while True: pub.publish(json_msg) time.sleep(1.0 / repeat_rate) except KeyboardInterrupt: pass
[docs]def echo(topic_name, num_print=None, out=sys.stdout, host=jps.env.get_master_host(), sub_port=jps.DEFAULT_SUB_PORT): '''print the data for the given topic forever ''' class PrintWithCount(object): def __init__(self, out): self._printed = 0 self._out = out def print_and_increment(self, msg): self._out.write('{}\n'.format(msg)) self._printed += 1 def get_count(self): return self._printed counter = PrintWithCount(out) sub = jps.Subscriber( topic_name, counter.print_and_increment, host=host, sub_port=sub_port) try: while num_print is None or counter.get_count() < num_print: sub.spin_once() time.sleep(0.0001) except KeyboardInterrupt: pass
[docs]def show_list(timeout_in_sec, out=sys.stdout, host=jps.env.get_master_host(), sub_port=jps.DEFAULT_SUB_PORT): '''get the name list of the topics, and print it ''' class TopicNameStore(object): def __init__(self): self._topic_names = set() def callback(self, msg, topic): self._topic_names.add(topic) def get_topic_names(self): names = list(self._topic_names) names.sort() return names store = TopicNameStore() sub = jps.Subscriber('*', store.callback, host=host, sub_port=sub_port) sleep_sec = 0.01 for i in range(int(timeout_in_sec / sleep_sec)): sub.spin_once(sleep_sec) time.sleep(0.001) # for context switch for name in store.get_topic_names(): out.write('{}\n'.format(name))
[docs]def record(file_path, topic_names=[], host=jps.env.get_master_host(), sub_port=jps.DEFAULT_SUB_PORT): '''record the topic data to the file ''' class TopicRecorder(object): def __init__(self, file_path, topic_names): self._topic_names = topic_names self._file_path = file_path self._output = open(self._file_path, 'w') signal.signal(signal.SIGINT, self._handle_signal) signal.signal(signal.SIGTERM, self._handle_signal) header = {} header['topic_names'] = topic_names header['start_date'] = str(datetime.datetime.today()) header_string = json.dumps({'header': header}) tail_removed_header = header_string[0:-1] self._output.write(tail_removed_header + ',\n') self._output.write(' "data": [\n') self._has_no_data = True def callback(self, msg, topic): if self._output.closed: return raw_msg = '{topic} {msg}'.format(topic=topic, msg=msg) if not self._topic_names or topic in self._topic_names: if not self._has_no_data: self._output.write(',\n') else: self._has_no_data = False self._output.write(json.dumps([time.time(), raw_msg])) def close(self): if not self._output.closed: self._output.write('\n]}') self._output.close() def _handle_signal(self, signum, frame): self.close() sys.exit(0) writer = TopicRecorder(file_path, topic_names) sub = jps.Subscriber('*', writer.callback, host=host, sub_port=sub_port) sub.spin() writer.close()
[docs]def play(file_path, host=jps.env.get_master_host(), pub_port=jps.DEFAULT_PUB_PORT): '''replay the recorded data by record() ''' pub = jps.Publisher('*', host=host, pub_port=pub_port) time.sleep(0.2) last_time = None print('start publishing file {}'.format(file_path)) with open(file_path, 'r') as f: # super hack to remove header f.readline() f.readline() for line in f: if line.startswith(']}'): break publish_time, raw_msg = json.loads(line.rstrip(',\n')) if last_time is not None: time.sleep(publish_time - last_time) pub.publish(raw_msg.rstrip()) last_time = publish_time print('fnished')
[docs]def topic_command(): '''command line tool for jps ''' parser = argparse.ArgumentParser(description='json pub/sub tool') pub_common_parser = jps.ArgumentParser(subscriber=False, add_help=False) sub_common_parser = jps.ArgumentParser(publisher=False, add_help=False) command_parsers = parser.add_subparsers(dest='command', help='command') pub_parser = command_parsers.add_parser( 'pub', help='publish topic from command line', parents=[pub_common_parser]) pub_parser.add_argument('topic_name', type=str, help='name of topic') pub_parser.add_argument( 'data', type=str, help='json string data to be published') pub_parser.add_argument('--repeat', '-r', help='repeat in hz', type=float) echo_parser = command_parsers.add_parser( 'echo', help='show topic data', parents=[sub_common_parser]) echo_parser.add_argument('topic_name', type=str, help='name of topic') echo_parser.add_argument( '--num', '-n', help='print N times and exit', type=int, default=None) list_parser = command_parsers.add_parser( 'list', help='show topic list', parents=[sub_common_parser]) list_parser.add_argument( '--timeout', '-t', help='timeout in sec', type=float, default=1.0) record_parser = command_parsers.add_parser( 'record', help='record topic data', parents=[sub_common_parser]) record_parser.add_argument('topic_names', nargs='*', help='topic names to be recorded', type=str) record_parser.add_argument( '--file', '-f', help='output file name (default: record.json)', type=str, default='record.json') play_parser = command_parsers.add_parser( 'play', help='play recorded topic data', parents=[pub_common_parser]) play_parser.add_argument('file', type=str, help='input file name') args = parser.parse_args() if args.command == 'pub': pub(args.topic_name, args.data, repeat_rate=args.repeat, host=args.host, pub_port=args.publisher_port) elif args.command == 'echo': echo(args.topic_name, args.num, host=args.host, sub_port=args.subscriber_port) elif args.command == 'list': show_list(args.timeout, host=args.host, sub_port=args.subscriber_port) elif args.command == 'record': record(args.file, args.topic_names, host=args.host, sub_port=args.subscriber_port) elif args.command == 'play': play(args.file, host=args.host, pub_port=args.publisher_port) else: parser.print_help()