Skip to main content

Post-processing tools 2024 R1

ensight_grpc

Last update: 16.07.2025
1import grpc
2import os, os.path
3import platform
4import random
5import subprocess
6import threading
7import uuid
8from concurrent import futures
9
10# these modules are the result of running protoc on the .proto file
11import ensight_pb2
12import ensight_pb2_grpc
13import dynamic_scene_graph_pb2
14import dynamic_scene_graph_pb2_grpc
15
16# Shared memory interface
17import ensight_grpc_shmem
18
19
20
28
29
35class EnSightGRPC(object):
36
45 def __init__(self, port=12345, host='127.0.0.1', version=''):
46 self._host = host
47 self._port = port
48 self._pid = None
49 self._channel = None
50 self._stub = None
51 self._dsg_stub = None
52 self._version = version
53 self._security_token = str(random.randint(0, 1000000))
54 # Streaming APIs
55 # Images
56 self._image_stream = None
57 self._image_thread = None
58 self._image = None
59 self._image_number = 0
60 self._shmem_client = None
61 self._shmem_filename = None
62 # Event (strings)
63 self._event_stream = None
64 self._event_thread = None
65 self._events = list()
66 self._prefix = None
67 self._sub_service = None
68 self._event_callback = None
69
70
71
74 def host(self):
75 return self._host
76
77
80 def port(self):
81 return self._port
82
83
92 def set_security_token(self, n):
93 self._security_token = n
94
95
98 def security_token(self):
99 return self._security_token
100
101
105 def shutdown(self):
106 # remove any subscription services we may have started
107 self.unsubscribe()
108 # if we launched EnSight, shut it down.
109 if self._pid is not None:
110 _ = self.stop_server()
111 # shutdown any shared memory client
112 if self._shmem_client is not None:
113 ensight_grpc_shmem.stream_destroy(self._shmem_client)
114 self._shmem_client = None
115 # destroy any shared memory file we generated
116 if self._shmem_filename is not None:
117 try:
118 os.remove(self._shmem_filename)
119 except Exception as e:
120 print("Unable to delete shared memory file: {}".format(str(e)))
121 self._shmem_filename = None
122
123
128 def start_server(self):
129 if self._pid is not None:
130 return self._pid
131
132 my_env = os.environ.copy()
133 exe = f'ensight{self._version}'
134 cmd = [exe, '-batch', '-grpc_server', str(self._port)]
135 if self._security_token:
136 cmd.append("-security")
137 cmd.append(self._security_token)
138 if platform.system() == 'Windows':
139 DETACHED_PROCESS = 0x00000008
140 cmd[0] += ".bat"
141 cmd.append('-minimize_console')
142 self._pid = subprocess.Popen(cmd, creationflags=DETACHED_PROCESS,
143 close_fds=True, env=my_env).pid
144 else:
145 self._pid = subprocess.Popen(exe, close_fds=True, env=my_env).pid
146 return self._pid
147
148
153 def stop_server(self):
154 response = None
155 # if we are connected and we started the server, we will emit the 'exit' message
156 if (self._pid is not None) and self.is_connected():
157 response = self._stub.Exit(ensight_pb2.ExitRequest(), metadata=self.metadata())
158 self._stub = None
159 self._dsg_stub = None
160 self._channel.close()
161 self._channel = None
162 self._pid = None
163 return response
164
165
168 def is_connected(self):
169 return self._channel is not None
170
171
177 def connect(self, timeout=15.0):
178 if self._channel is not None:
179 return
180 self._channel = grpc.insecure_channel("{}:{}".format(self._host, self._port),
181 options=[
182 ("grpc.max_receive_message_length", -1),
183 ("grpc.max_send_message_length", -1),
184 ("grpc.testing.fixed_reconnect_backoff_ms", 1100),
185 ])
186 try:
187 grpc.channel_ready_future(self._channel).result(timeout=timeout)
188 except grpc.FutureTimeoutError:
189 self._channel = None
190 return
191 self._stub = ensight_pb2_grpc.EnSightServiceStub(self._channel)
192 self._dsg_stub = dynamic_scene_graph_pb2_grpc.DynamicSceneGraphServiceStub(self._channel)
193
194
197 def metadata(self):
198 ret = list()
199 if self._security_token is not None:
200 s = self._security_token
201 if type(s) == str:
202 s = s.encode("utf-8")
203 ret.append( (b'shared_secret', s) )
204 return ret
205
206
215 def render(self, width=640, height=480, aa=1, raw=False, highlighting=False):
216 self.connect()
217 ret_type = ensight_pb2.RenderRequest.IMAGE_PNG
218 if raw:
219 ret_type = ensight_pb2.RenderRequest.IMAGE_RAW
220 try:
221 response = self._stub.RenderImage(
222 ensight_pb2.RenderRequest(type=ret_type, image_width=width,
223 image_height=height, image_aa_passes=aa,
224 include_highlighting=highlighting),
225 metadata=self.metadata())
226 except:
227 raise IOError("gRPC connection dropped")
228 return response.value
229
230
235 def geometry(self):
236 self.connect()
237 try:
238 response = self._stub.GetGeometry(
239 ensight_pb2.GeometryRequest(type=ensight_pb2.GeometryRequest.GEOMETRY_GLB),
240 metadata=self.metadata())
241 except:
242 raise IOError("gRPC connection dropped")
243 return response.value
244
245
254 def command(self, command_string, do_eval=True, json=False, raw_eval=False):
255 self.connect()
256 flags = ensight_pb2.PythonRequest.EXEC_RETURN_PYTHON
257 if json:
258 flags = ensight_pb2.PythonRequest.EXEC_RETURN_JSON
259 if not do_eval:
260 flags = ensight_pb2.PythonRequest.EXEC_NO_RESULT
261 try:
262 response = self._stub.RunPython(ensight_pb2.PythonRequest(type=flags,
263 command=command_string),
264 metadata=self.metadata())
265 except:
266 raise IOError("gRPC connection dropped")
267 if response.error < 0:
268 raise RuntimeError("Remote execution error")
269 if flags == ensight_pb2.PythonRequest.EXEC_NO_RESULT:
270 return None
271 elif flags == ensight_pb2.PythonRequest.EXEC_RETURN_PYTHON:
272 if not raw_eval:
273 return eval(response.value)
274 return response.value
275
276
284 def image_stream_enable(self, flip_vertical=False):
285 if self._image_stream is not None:
286 return
287 self.connect()
288 self._image_stream = self._stub.GetImageStream(
289 ensight_pb2.ImageStreamRequest(flip_vertical=flip_vertical, chunk=True),
290 metadata=self.metadata())
291 self._image_thread = threading.Thread(target=self.poll_images)
292 self._image_thread.daemon = True
293 self._image_thread.start()
294
295
302 def get_image(self):
303 return self._image, self._image_number
304
305
311 def put_image(self, the_image):
312 self._image = the_image
313 self._image_number += 1
314
315
321 def poll_images(self):
322 try:
323 while self._stub is not None:
324 if self._shmem_client is not None:
325 img = ensight_grpc_shmem.stream_lock(self._shmem_client)
326 if type(img) is dict:
327 the_image = dict(pixels=img['pixeldata'],
328 width=img['width'],
329 height=img['height'])
330 self.put_image(the_image)
331 ensight_grpc_shmem.stream_unlock(self._shmem_client)
332
333 if self._image_stream is not None:
334 img = self._image_stream.next()
335 buffer = img.pixels
336
337 while not img.final:
338 img = self._image_stream.next()
339 buffer += img.pixels
340
341 the_image = dict(pixels=buffer, width=img.width, height=img.height)
342 self.put_image(the_image)
343 except:
344 # signal that the gRPC connection has broken
345 self._image_stream = None
346 self._image_thread = None
347 self._image = None
348
349
355 return self._image_stream is not None
356
357
363 def prefix(self):
364 # prefix URIs will have the format: "grpc://{uuid}/{callbackname}?enum={}&uid={}"
365 if self._prefix is None:
366 self._prefix = "grpc://" + str(uuid.uuid1()) + "/"
367 return self._prefix
368
369
376 def event_stream_enable(self, callback = None, prefix = None):
377 if self._event_stream is not None:
378 return
379 self._event_callback = callback
380 self.connect()
381 pfx = prefix
382 if pfx is None:
383 pfx = self.prefix()
384 self._event_stream = self._stub.GetEventStream(
385 ensight_pb2.EventStreamRequest(prefix=pfx), metadata=self.metadata())
386 self._event_thread = threading.Thread(target=self.poll_events)
387 self._event_thread.daemon = True
388 self._event_thread.start()
389
390
396 def dynamic_scene_graph_stream(self, client_cmds):
397 self.connect()
398 return self._dsg_stub.GetSceneStream(client_cmds, metadata=self.metadata())
399
400
406 return self._event_stream is not None
407
408
415 def get_event(self):
416 try:
417 return self._events.pop(0)
418 except IndexError:
419 return None
420
421
427 def put_event(self, evt):
428 if self._event_callback:
429 self._event_callback(evt.tag)
430 return
431 self._events.append(evt)
432
433
439 def poll_events(self):
440 try:
441 while self._stub is not None:
442 evt = self._event_stream.next()
443 self.put_event(evt)
444 except:
445 # signal that the gRPC connection has broken
446 self._event_stream = None
447 self._event_thread = None
448
449
457 def start_sub_service(self):
458 try:
459 if self._sub_service is not None:
460 return
461 self._sub_service = EnSightSubServicer(parent=self)
462 self._sub_service.start()
463 except:
464 self._sub_service = None
465
466
472 self.start_sub_service()
473 self.connect()
474 conn_type = ensight_pb2.SubscribeEventOptions.GRPC
475 options = dict(uri=self._sub_service._uri)
476 event_options = ensight_pb2.SubscribeEventOptions(prefix=self.prefix(), type=conn_type,
477 options=options)
478 _ = self._stub.SubscribeEvents(event_options, metadata=self.metadata())
479
480
488 def subscribe_images(self, flip_vertical=False, use_shmem=True):
489 self.connect()
490 if use_shmem:
491 try:
492 # we need a shared memory file
493 self._shmem_filename = self.find_filename()
494 if self._shmem_filename is not None:
495 conn_type = ensight_pb2.SubscribeImageOptions.SHARED_MEM
496 options = dict(filename=self._shmem_filename)
497 image_options = ensight_pb2.SubscribeImageOptions(prefix=self.prefix(),
498 type=conn_type,
499 options=options,
500 flip_vertical=flip_vertical,
501 chunk=False)
502 test = self._stub.SubscribeImages(image_options, metadata=self.metadata())
503 # start the local server
504 self._shmem_client = ensight_grpc_shmem.stream_create(self._shmem_filename)
505 # turn on the polling thread
506 self._image_thread = threading.Thread(target=self.poll_images)
507 self._image_thread.daemon = True
508 self._image_thread.start()
509 return
510 except Exception as e:
511 print("Unable to subscribe to an image stream via shared memory: {}".format(str(e)))
512
513 self.start_sub_service()
514 conn_type = ensight_pb2.SubscribeImageOptions.GRPC
515 options = dict(uri=self._sub_service._uri)
516 image_options = ensight_pb2.SubscribeImageOptions(prefix=self.prefix(),
517 type=conn_type,
518 options=options,
519 flip_vertical=flip_vertical,
520 chunk=True)
521 _ = self._stub.SubscribeImages(image_options, metadata=self.metadata())
522
523
527 def unsubscribe(self):
528 if self.is_connected():
529 if self._sub_service is not None:
530 prefix = ensight_pb2.Prefix(prefix=self.prefix())
531 self._stub.Unsubscribe(prefix, metadata=self.metadata())
532 self._sub_service = None
533
534
540 @classmethod
541 def find_filename(cls, size=1024*1024*25):
542 for i in range(100):
543 filename = os.path.join(os.getcwd(), "shmem_{}.bin".format(os.getpid()+i))
544 if not os.path.exists(filename):
545 try:
546 tmp = open(filename, "wb")
547 tmp.write(b'\0'*size) # 25MB
548 tmp.close()
549 return filename
550 except:
551 pass
552 return None
553
554
555
568class EnSightSubServicer(ensight_pb2_grpc.EnSightSubscriptionServicer):
569
570 def __init__(self, parent=None):
571 self._server = None
572 self._uri = ""
573 self._parent = parent
574
575 def PublishEvent(self, request, context):
576 if self._parent is not None:
577 self._parent.put_event(request)
578 return ensight_pb2.GenericResponse(str='Event Published')
579
580 def PublishImage(self, request_iterator, context):
581 img = request_iterator.next()
582 buffer = img.pixels
583 while not img.final:
584 img = request_iterator.next()
585 buffer += img.pixels
586 the_image = dict(pixels=buffer, width=img.width, height=img.height)
587 if self._parent is not None:
588 self._parent.put_image(the_image)
589 return ensight_pb2.GenericResponse(str='Image Published')
590
591 def start(self):
592 self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
593 ensight_pb2_grpc.add_EnSightSubscriptionServicer_to_server(
594 self, self._server)
595 # Start the server on localhost with a random port
596 port = self._server.add_insecure_port('localhost:0')
597 self._uri = 'localhost:' + str(port)
598 self._server.start()
599
600
Python binding for the EnSight client gRPC API.
Definition: ensight_grpc.py:35
def prefix(self)
return the unique prefix for this instance
def connect(self, timeout=15.0)
establish a connection to an EnSight gRPC server
def image_stream_enable(self, flip_vertical=False)
enable a simple gRPC-based image stream from EnSight
def event_stream_is_enabled(self)
check to see if the event stream is enabled
def put_image(self, the_image)
def set_security_token(self, n)
set the security token for the gRPC connection.
Definition: ensight_grpc.py:92
def event_stream_enable(self, callback=None, prefix=None)
enable a simple gRPC-based event stream from EnSight
def image_stream_is_enabled(self)
check to see if the image stream is enabled
def subscribe_events(self)
subscribe to an event stream using a locally launched gRPC server
def shutdown(self)
shut down all gRPC connections
def find_filename(cls, size=1024 *1024 *25)
def start_server(self)
Start an EnSight gRPC server instance.
def security_token(self)
return the security token for the gRPC connection.
Definition: ensight_grpc.py:98
def unsubscribe(self)
unsubscribe from any image or event streams that have been subscribed to
def get_image(self)
retrieve the current EnSight image
def render(self, width=640, height=480, aa=1, raw=False, highlighting=False)
render the current EnSight screen and return the image
def host(self)
get the hostname for this connection
Definition: ensight_grpc.py:74
def __init__(self, port=12345, host='127.0.0.1', version='')
create an instance of the EnSight gRPC interface wrapper
Definition: ensight_grpc.py:45
def is_connected(self)
check if a gRPC connection has been established
def subscribe_images(self, flip_vertical=False, use_shmem=True)
subscribe to an image stream
def stop_server(self)
shut down any gPRC connection made by this class
def put_event(self, evt)
def port(self)
get the port number for this connection
Definition: ensight_grpc.py:80
def geometry(self)
return the current scene geometry in glTF format
def get_event(self)
retrieve and remove the oldest ensightservice::EventReply string
def command(self, command_string, do_eval=True, json=False, raw_eval=False)
send a Python command string to be executed in EnSight
def dynamic_scene_graph_stream(self, client_cmds)
open up a dynamic scene graph stream

Connect with Ansys