Skip to main content

Post-processing tools 2023 R2

ensight_grpc.py

Last update: 17.04.2023
1 import grpc
2 import os, os.path
3 import platform
4 import random
5 import subprocess
6 import threading
7 import uuid
8 from concurrent import futures
9 
10 # these modules are the result of running protoc on the .proto file
11 import ensight_pb2
12 import ensight_pb2_grpc
13 import dynamic_scene_graph_pb2
14 import dynamic_scene_graph_pb2_grpc
15 
16 # Shared memory interface
17 import ensight_grpc_shmem
18 
19 
20 
28 
29 
35 class EnSightGRPC(object):
36 
45  def __init__(self, port=12345, host='127.0.0.1', version=''):
46  self._host_host = host
47  self._port_port = port
48  self._pid_pid = None
49  self._channel_channel = None
50  self._stub_stub = None
51  self._dsg_stub_dsg_stub = None
52  self._version_version = version
53  self._security_token_security_token = str(random.randint(0, 1000000))
54  # Streaming APIs
55  # Images
56  self._image_stream_image_stream = None
57  self._image_thread_image_thread = None
58  self._image_image = None
59  self._image_number_image_number = 0
60  self._shmem_client_shmem_client = None
61  self._shmem_filename_shmem_filename = None
62  # Event (strings)
63  self._event_stream_event_stream = None
64  self._event_thread_event_thread = None
65  self._events_events = list()
66  self._prefix_prefix = None
67  self._sub_service_sub_service = None
68 
69 
72  def host(self):
73  return self._host_host
74 
75 
78  def port(self):
79  return self._port_port
80 
81 
90  def set_security_token(self, n):
91  self._security_token_security_token = n
92 
93 
96  def security_token(self):
97  return self._security_token_security_token
98 
99 
103  def shutdown(self):
104  # remove any subscription services we may have started
105  self.unsubscribeunsubscribe()
106  # if we launched EnSight, shut it down.
107  if self._pid_pid is not None:
108  _ = self.stop_serverstop_server()
109  # shutdown any shared memory client
110  if self._shmem_client_shmem_client is not None:
111  ensight_grpc_shmem.stream_destroy(self._shmem_client_shmem_client)
112  self._shmem_client_shmem_client = None
113  # destroy any shared memory file we generated
114  if self._shmem_filename_shmem_filename is not None:
115  try:
116  os.remove(self._shmem_filename_shmem_filename)
117  except Exception as e:
118  print("Unable to delete shared memory file: {}".format(str(e)))
119  self._shmem_filename_shmem_filename = None
120 
121 
126  def start_server(self):
127  if self._pid_pid is not None:
128  return self._pid_pid
129 
130  my_env = os.environ.copy()
131  exe = f'ensight{self._version}'
132  cmd = [exe, '-batch', '-grpc_server', str(self._port_port)]
133  if self._security_token_security_token:
134  cmd.append("-security")
135  cmd.append(self._security_token_security_token)
136  if platform.system() == 'Windows':
137  DETACHED_PROCESS = 0x00000008
138  cmd[0] += ".bat"
139  cmd.append('-minimize_console')
140  self._pid_pid = subprocess.Popen(cmd, creationflags=DETACHED_PROCESS, close_fds=True, env=my_env).pid
141  else:
142  self._pid_pid = subprocess.Popen(exe, close_fds=True, env=my_env).pid
143  return self._pid_pid
144 
145 
150  def stop_server(self):
151  response = None
152  # if we are connected and we started the server, we will emit the 'exit' message
153  if (self._pid_pid is not None) and self.is_connectedis_connected():
154  response = self._stub_stub.Exit(ensight_pb2.ExitRequest(), metadata=self.metadatametadata())
155  self._stub_stub = None
156  self._dsg_stub_dsg_stub = None
157  self._channel_channel.close()
158  self._channel_channel = None
159  self._pid_pid = None
160  return response
161 
162 
165  def is_connected(self):
166  return self._channel_channel is not None
167 
168 
174  def connect(self, timeout=15.0):
175  if self._channel_channel is not None:
176  return
177  self._channel_channel = grpc.insecure_channel("{}:{}".format(self._host_host, self._port_port),
178  options=[('grpc.max_receive_message_length', -1)])
179  try:
180  grpc.channel_ready_future(self._channel_channel).result(timeout=timeout)
181  except grpc.FutureTimeoutError:
182  self._channel_channel = None
183  return
184  self._stub_stub = ensight_pb2_grpc.EnSightServiceStub(self._channel_channel)
185  self._dsg_stub_dsg_stub = dynamic_scene_graph_pb2_grpc.DynamicSceneGraphServiceStub(self._channel_channel)
186 
187 
190  def metadata(self):
191  ret = list()
192  if self._security_token_security_token is not None:
193  s = self._security_token_security_token
194  if type(s) == str:
195  s = s.encode("utf-8")
196  ret.append( (b'shared_secret', s) )
197  return ret
198 
199 
208  def render(self, width=640, height=480, aa=1, raw=False, highlighting=False):
209  self.connectconnect()
210  ret_type = ensight_pb2.RenderRequest.IMAGE_PNG
211  if raw:
212  ret_type = ensight_pb2.RenderRequest.IMAGE_RAW
213  try:
214  response = self._stub_stub.RenderImage(ensight_pb2.RenderRequest(type=ret_type, image_width=width,
215  image_height=height, image_aa_passes=aa,
216  include_highlighting=highlighting),
217  metadata=self.metadatametadata())
218  except:
219  raise IOError("gRPC connection dropped")
220  return response.value
221 
222 
227  def geometry(self):
228  self.connectconnect()
229  try:
230  response = self._stub_stub.GetGeometry(
231  ensight_pb2.GeometryRequest(type=ensight_pb2.GeometryRequest.GEOMETRY_GLB), metadata=self.metadatametadata())
232  except:
233  raise IOError("gRPC connection dropped")
234  return response.value
235 
236 
245  def command(self, command_string, do_eval=True, json=False):
246  self.connectconnect()
247  flags = ensight_pb2.PythonRequest.EXEC_RETURN_PYTHON
248  if json:
249  flags = ensight_pb2.PythonRequest.EXEC_RETURN_JSON
250  if not do_eval:
251  flags = ensight_pb2.PythonRequest.EXEC_NO_RESULT
252  try:
253  response = self._stub_stub.RunPython(ensight_pb2.PythonRequest(type=flags, command=command_string),
254  metadata=self.metadatametadata())
255  except:
256  raise IOError("gRPC connection dropped")
257  if response.error < 0:
258  raise RuntimeError("Remote execution error")
259  if flags == ensight_pb2.PythonRequest.EXEC_NO_RESULT:
260  return None
261  elif flags == ensight_pb2.PythonRequest.EXEC_RETURN_PYTHON:
262  return eval(response.value)
263  return response.value
264 
265 
273  def image_stream_enable(self, flip_vertical=False):
274  if self._image_stream_image_stream is not None:
275  return
276  self.connectconnect()
277  self._image_stream_image_stream = self._stub_stub.GetImageStream(
278  ensight_pb2.ImageStreamRequest(flip_vertical=flip_vertical, chunk=True), metadata=self.metadatametadata())
279  self._image_thread_image_thread = threading.Thread(target=self.poll_imagespoll_images)
280  self._image_thread_image_thread.daemon = True
281  self._image_thread_image_thread.start()
282 
283 
290  def get_image(self):
291  return self._image_image, self._image_number_image_number
292 
293 
299  def put_image(self, the_image):
300  self._image_image = the_image
301  self._image_number_image_number += 1
302 
303 
309  def poll_images(self):
310  try:
311  while self._stub_stub is not None:
312  if self._shmem_client_shmem_client is not None:
313  img = ensight_grpc_shmem.stream_lock(self._shmem_client_shmem_client)
314  if type(img) is dict:
315  the_image = dict(pixels=img['pixeldata'], width=img['width'], height=img['height'])
316  self.put_imageput_image(the_image)
317  ensight_grpc_shmem.stream_unlock(self._shmem_client_shmem_client)
318 
319  if self._image_stream_image_stream is not None:
320  img = self._image_stream_image_stream.next()
321  buffer = img.pixels
322 
323  while not img.final:
324  img = self._image_stream_image_stream.next()
325  buffer += img.pixels
326 
327  the_image = dict(pixels=buffer, width=img.width, height=img.height)
328  self.put_imageput_image(the_image)
329  except:
330  # signal that the gRPC connection has broken
331  self._image_stream_image_stream = None
332  self._image_thread_image_thread = None
333  self._image_image = None
334 
335 
341  return self._image_stream_image_stream is not None
342 
343 
349  def prefix(self):
350  # prefix URIs will have the format: "grpc://{uuid}/{callbackname}?enum={}&uid={}"
351  if self._prefix_prefix is None:
352  self._prefix_prefix = "grpc://" + str(uuid.uuid1()) + "/"
353  return self._prefix_prefix
354 
355 
362  if self._event_stream_event_stream is not None:
363  return
364  self.connectconnect()
365  self._event_stream_event_stream = self._stub_stub.GetEventStream(ensight_pb2.EventStreamRequest(prefix=self.prefixprefix()),
366  metadata=self.metadatametadata())
367  self._event_thread_event_thread = threading.Thread(target=self.poll_eventspoll_events)
368  self._event_thread_event_thread.daemon = True
369  self._event_thread_event_thread.start()
370 
371 
377  def dynamic_scene_graph_stream(self, client_cmds):
378  self.connectconnect()
379  return self._dsg_stub_dsg_stub.GetSceneStream(client_cmds, metadata=self.metadatametadata())
380 
381 
387  return self._event_stream_event_stream is not None
388 
389 
396  def get_event(self):
397  try:
398  return self._events_events.pop(0)
399  except IndexError:
400  return None
401 
402 
408  def put_event(self, evt):
409  self._events_events.append(evt)
410 
411 
417  def poll_events(self):
418  try:
419  while self._stub_stub is not None:
420  evt = self._event_stream_event_stream.next()
421  self.put_eventput_event(evt)
422  except:
423  # signal that the gRPC connection has broken
424  self._event_stream_event_stream = None
425  self._event_thread_event_thread = None
426 
427 
435  def start_sub_service(self):
436  try:
437  if self._sub_service_sub_service is not None:
438  return
439  self._sub_service_sub_service = EnSightSubServicer(parent=self)
440  self._sub_service_sub_service.start()
441  except:
442  self._sub_service_sub_service = None
443 
444 
449  def subscribe_events(self):
450  self.start_sub_servicestart_sub_service()
451  self.connectconnect()
452  conn_type = ensight_pb2.SubscribeEventOptions.GRPC
453  options = dict(uri=self._sub_service_sub_service._uri)
454  event_options = ensight_pb2.SubscribeEventOptions(prefix=self.prefixprefix(), type=conn_type, options=options)
455  _ = self._stub_stub.SubscribeEvents(event_options, metadata=self.metadatametadata())
456 
457 
465  def subscribe_images(self, flip_vertical=False, use_shmem=True):
466  self.connectconnect()
467  if use_shmem:
468  try:
469  # we need a shared memory file
470  self._shmem_filename_shmem_filename = self.find_filenamefind_filename()
471  if self._shmem_filename_shmem_filename is not None:
472  conn_type = ensight_pb2.SubscribeImageOptions.SHARED_MEM
473  options = dict(filename=self._shmem_filename_shmem_filename)
474  image_options = ensight_pb2.SubscribeImageOptions(prefix=self.prefixprefix(), type=conn_type,
475  options=options, flip_vertical=flip_vertical,
476  chunk=False)
477  test = self._stub_stub.SubscribeImages(image_options, metadata=self.metadatametadata())
478  # start the local server
479  self._shmem_client_shmem_client = ensight_grpc_shmem.stream_create(self._shmem_filename_shmem_filename)
480  # turn on the polling thread
481  self._image_thread_image_thread = threading.Thread(target=self.poll_imagespoll_images)
482  self._image_thread_image_thread.daemon = True
483  self._image_thread_image_thread.start()
484  return
485  except Exception as e:
486  print("Unable to subscribe to an image stream via shared memory: {}".format(str(e)))
487 
488  self.start_sub_servicestart_sub_service()
489  conn_type = ensight_pb2.SubscribeImageOptions.GRPC
490  options = dict(uri=self._sub_service_sub_service._uri)
491  image_options = ensight_pb2.SubscribeImageOptions(prefix=self.prefixprefix(), type=conn_type, options=options,
492  flip_vertical=flip_vertical, chunk=True)
493  _ = self._stub_stub.SubscribeImages(image_options, metadata=self.metadatametadata())
494 
495 
499  def unsubscribe(self):
500  if self.is_connectedis_connected():
501  if self._sub_service_sub_service is not None:
502  prefix = ensight_pb2.Prefix(prefix=self.prefixprefix())
503  self._stub_stub.Unsubscribe(prefix, metadata=self.metadatametadata())
504  self._sub_service_sub_service = None
505 
506 
512  @classmethod
513  def find_filename(cls, size=1024*1024*25):
514  for i in range(100):
515  filename = os.path.join(os.getcwd(), "shmem_{}.bin".format(os.getpid()+i))
516  if not os.path.exists(filename):
517  try:
518  tmp = open(filename, "wb")
519  tmp.write(b'\0'*size) # 25MB
520  tmp.close()
521  return filename
522  except:
523  pass
524  return None
525 
526 
527 
540 class EnSightSubServicer(ensight_pb2_grpc.EnSightSubscriptionServicer):
541 
542  def __init__(self, parent=None):
543  self._server = None
544  self._uri = ""
545  self._parent = parent
546 
547  def PublishEvent(self, request, context):
548  if self._parent is not None:
549  self._parent.put_event(request)
550  return ensight_pb2.GenericResponse(str='Event Published')
551 
552  def PublishImage(self, request_iterator, context):
553  img = request_iterator.next()
554  buffer = img.pixels
555  while not img.final:
556  img = request_iterator.next()
557  buffer += img.pixels
558  the_image = dict(pixels=buffer, width=img.width, height=img.height)
559  if self._parent is not None:
560  self._parent.put_image(the_image)
561  return ensight_pb2.GenericResponse(str='Image Published')
562 
563  def start(self):
564  self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
565  ensight_pb2_grpc.add_EnSightSubscriptionServicer_to_server(
566  self, self._server)
567  # Start the server on localhost with a random port
568  port = self._server.add_insecure_port('localhost:0')
569  self._uri = 'localhost:' + str(port)
570  self._server.start()
571 
572 
Python binding for the EnSight client gRPC API.
Definition: ensight_grpc.py:35
def prefix(self)
return the unique prefix for this instance
def connect(self, timeout=15.0)
establish a connection to an EnSight gRPC server
def image_stream_enable(self, flip_vertical=False)
enable a simple gRPC-based image stream from EnSight
def event_stream_is_enabled(self)
check to see if the event stream is enabled
def put_image(self, the_image)
def command(self, command_string, do_eval=True, json=False)
send a Python command string to be executed in EnSight
def set_security_token(self, n)
set the security token for the gRPC connection.
Definition: ensight_grpc.py:90
def image_stream_is_enabled(self)
check to see if the image stream is enabled
def subscribe_events(self)
subscribe to an event stream using a locally launched gRPC server
def shutdown(self)
shut down all gRPC connections
def find_filename(cls, size=1024 *1024 *25)
def start_server(self)
Start an EnSight gRPC server instance.
def security_token(self)
return the security token for the gRPC connection.
Definition: ensight_grpc.py:96
def unsubscribe(self)
unsubscribe from any image or event streams that have been subscribed to
def get_image(self)
retrieve the current EnSight image
def render(self, width=640, height=480, aa=1, raw=False, highlighting=False)
render the current EnSight screen and return the image
def host(self)
get the hostname for this connection
Definition: ensight_grpc.py:72
def __init__(self, port=12345, host='127.0.0.1', version='')
create an instance of the EnSight gRPC interface wrapper
Definition: ensight_grpc.py:45
def is_connected(self)
check if a gRPC connection has been established
def event_stream_enable(self)
enable a simple gRPC-based event stream from EnSight
def subscribe_images(self, flip_vertical=False, use_shmem=True)
subscribe to an image stream
def stop_server(self)
shut down any gPRC connection made by this class
def put_event(self, evt)
def port(self)
get the port number for this connection
Definition: ensight_grpc.py:78
def geometry(self)
return the current scene geometry in glTF format
def get_event(self)
retrieve and remove the oldest ensightservice::EventReply string
def dynamic_scene_graph_stream(self, client_cmds)
open up a dynamic scene graph stream
rpc GetSceneStream(stream SceneClientCommand) returns(stream SceneUpdateCommand)
Start a new dynamic scene graph stream.
rpc SubscribeImages(SubscribeImageOptions subscribeimageoptions) returns(GenericResponse)
Definition: ensight.proto:93
rpc RunPython(PythonRequest pythonrequest) returns(PythonReply)
Definition: ensight.proto:40
rpc Unsubscribe(Prefix prefix) returns(GenericResponse)
Definition: ensight.proto:100
rpc Exit(ExitRequest exitrequest) returns(ExitReply)
Definition: ensight.proto:52
rpc SubscribeEvents(SubscribeEventOptions subscribeeventoptions) returns(GenericResponse)
Definition: ensight.proto:81
rpc GetEventStream(EventStreamRequest eventstreamrequest) returns(stream EventReply)
Definition: ensight.proto:70
rpc RenderImage(RenderRequest renderrequest) returns(RenderReply)
Definition: ensight.proto:44
rpc GetImageStream(ImageStreamRequest imagestreamrequest) returns(stream ImageReply)
Definition: ensight.proto:106
rpc GetGeometry(GeometryRequest geometryrequest) returns(GeometryReply)
Definition: ensight.proto:48
rpc PublishImage(stream ImageReply) returns(GenericResponse)
Definition: ensight.proto:136
rpc PublishEvent(EventReply eventreply) returns(GenericResponse)
Definition: ensight.proto:140