Skip to main content

Post-processing tools 2023 R2

test_dvs_server.cpp

Last update: 17.04.2023
Go to the documentation of this file.
1 /* *************************************************************
2  * Copyright 2017-2022 ANSYS, Inc.
3  * All Rights Reserved.
4  *
5  * Restricted Rights Legend
6  *
7  * Use, duplication, or disclosure of this
8  * software and its documentation by the
9  * Government is subject to restrictions as
10  * set forth in subdivision [(b)(3)(ii)] of
11  * the Rights in Technical Data and Computer
12  * Software clause at 52.227-7013.
13  * *************************************************************
14  */
15 
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <stdint.h>
33 #include <stdarg.h>
34 
35 #include "dvs_server_interface.h"
37 #include "logger.h"
38 
39 #include <atomic>
40 #include <chrono>
41 #include <mutex>
42 #include <thread>
43 #include <vector>
44 #include <signal.h>
45 
46 static std::atomic<bool> s_terminate{false};
47 static std::mutex s_server_mutex;
48 static DVS::IServer* s_server{nullptr};
49 
55 void signal_callback_handler(int signum)
56 {
57  fprintf(stdout, "Caught Signal: %i\n", signum);
58  s_terminate = true;
59  std::lock_guard<std::mutex> lock(s_server_mutex);
60  if (s_server)
61  {
62  s_server->terminating();
63  }
64 }
65 
73 int main(int argc, char** argv)
74 {
75  //Register a signal handler instead of just killing the app so
76  //we can shutdown gracefully
77  signal(SIGINT, signal_callback_handler);
78 
79  uint32_t port = 50055;
80  uint32_t delay_ms = 500;
81  char host[512] = {0};
82  strcpy(host, "127.0.0.1");
83  char protocol[10] = {0};
84  strcpy(protocol, "grpc");
85 
86  char secret[255] = {0};
87  char cache_uri[512] = {0};
88  char dvs_file_loc[1024] = {0};
89  uint32_t debug_wait_sec = 0;
90  uint32_t server_number = 0;
91  uint32_t local_ranks = 1;
92  uint32_t server_verbosity = 3;
93  uint32_t timeout_sec = 15;
94 
95  uint32_t i = 1;
96  while (i < argc)
97  {
98  if ((strcmp(argv[i], "-p") == 0 ) && (i < argc - 1))
99  {
100  i++;
101  port = atoi(argv[i]);
102  }
103  else if ((strcmp(argv[i], "-h") == 0) && (i < argc - 1))
104  {
105  i++;
106  strncpy(host, argv[i], 255);
107  }
108  else if ((strcmp(argv[i], "-protocol") == 0) && (i < argc - 1))
109  {
110  i++;
111  strncpy(protocol, argv[i], 10);
112  if (strcmp(protocol, "grpc") != 0 && strcmp(protocol, "null") != 0 )
113  {
114  fprintf(stderr, "protocol: %s invalid\n", protocol);
115  exit(1);
116  }
117  }
118  else if ((strcmp(argv[i], "-d") == 0) && (i < argc - 1))
119  {
120  i++;
121  delay_ms = atoi(argv[i]);
122  }
123  else if (strcmp(argv[i], "-secret") == 0)
124  {
125  i++;
126  strncpy(secret, argv[i], 255);
127  }
128  else if (strcmp(argv[i], "-server") == 0 && (i < argc - 3))
129  {
130  i++;
131  server_number = atoi(argv[i++]);
132  local_ranks = atoi(argv[i++]);
133  server_verbosity = atoi(argv[i]);
134  }
135  else if (strcmp(argv[i], "-cache_uri") == 0)
136  {
137  i++;
138  strncpy(cache_uri, argv[i], 512);
139  }
140  else if (strcmp(argv[i], "-debug_wait") == 0)
141  {
142  i++;
143  debug_wait_sec = atoi(argv[i]);
144  }
145  else if (strcmp(argv[i], "-timeout") == 0)
146  {
147  i++;
148  timeout_sec = atoi(argv[i]);
149  }
150  else if (strcmp(argv[i], "-dvs_file") == 0)
151  {
152  i++;
153  strncpy(dvs_file_loc, argv[i], sizeof(dvs_file_loc)-1);
154  }
155  else
156  {
157  fprintf(stderr, "Unknown option: %s\n", argv[i]);
158  fprintf(stderr, "Usage: %s [-p port] [-h host] [-d ms delay] [-s dx dy dz] \n", argv[0]);
159  fprintf(stderr, "Options:\n");
160  fprintf(stderr, " -p port Server port to connect to. Default: %u\n", port);
161  fprintf(stderr, " -h host Server hostname to connect to. Default: %s\n", host);
162  fprintf(stderr, " -protocol str Server protocol. I.E. grpc or null. Default: grpc");
163  fprintf(stderr, " -d delay(ms) Milliseconds to delay between server updates. Default: %u\n", delay_ms);
164  fprintf(stderr, " -secret string Shared secret to use when talking with server Default: no shared secret\n");
165  fprintf(stderr, " -server n r v Start a server using server number [n], expecting [r] local ranks for DVS connections to connect to with verbosity [v]. Local Rank Min/Max: 1/1000\n");
166  fprintf(stderr, " -cache_uri str The URI for the server to use for the cache. Default: No cache\n");
167  fprintf(stderr, " -dvs_file str Set a location for the dvs files to be created Default: ./\n");
168  fprintf(stderr, " -timeout sec Shutdown the server if no new data has been received within timeout. Default: %u sec\n", timeout_sec);
169  fprintf(stderr, " -debug_wait s Wait for [s] for debugging\n");
170  exit(1);
171  }
172  i++;
173  }
174 
175  //This is just used for debugging/attaching to the app if some problem arises
176  if (debug_wait_sec > 0) {
177  std::this_thread::sleep_for(std::chrono::seconds(debug_wait_sec));
178  }
179 
180  char uri[512] = {0};
181  sprintf(uri, "%s://%s:%u", protocol, host, port);
182  //The server URI is used to setup the protocol host and port of the server
183  //(where and how clients need to connect). It will be something like:
184  //grpc:://localhost:50055
185  fprintf(stderr, "Server URI: %s\n", uri);
186  fprintf(stderr, "Update Delay(ms): %u\n", delay_ms);
187 
188  {
189  std::lock_guard<std::mutex> lock(s_server_mutex);
190  s_server = DVS::CREATE_SERVER_INSTANCE(uri);
191  }
192  if (s_server == nullptr)
193  {
194  fprintf(stderr, "Failed to create server\n");
195  exit(1);
196  }
197  char temp[10];
198  snprintf(temp, 10, "%u", server_verbosity);
199  s_server->set_option("VERBOSE",temp);
200  if (strlen(cache_uri) > 0)
201  {
202  //If not using a cache_uri the cache will be all in memory, not
203  //really useful for this specific test server app but showing a simple example.
204  //In the future you will be able to use the DVS Reader API to read data from
205  //memory using this instead of needing to read data from the cache.
206 
207  //The cache URI should be something like: hdf5://localhost/D:/my_cache
208  s_server->set_option("CACHE_URI",cache_uri);
209  }
210 
211  if (strlen(dvs_file_loc) > 0)
212  {
213  //The DVS file location let's you point the location of the auto created
214  //.dvs files to another location, absolute or relative to the cache_uri location
215  s_server->set_option("DVS_FILE_LOCATION", dvs_file_loc);
216  }
217 
218  if (strlen(secret) > 0)
219  {
220  //This is needed if wanting to use a simple secret to allow clients
221  //to connect to you. The clients will also need this secret.
222  s_server->set_option("SERVER_SECURITY_SECRET", secret);
223  }
224 
225  //The startup_unthreaded call will start/initialize the server and block until
226  //a all clients have initialized the dataset and sent a single timestep. A call
227  //to s_server->terminating() can be seen above in the signal handler to allow you
228  //to exit this call early.
229  s_server->startup_unthreaded(server_number, local_ranks);
230 
231  //For this example we are going to quit early if number pending or number complete have not
232  //changed within timeout_sec seconds.
233  uint32_t num_pending = 0;
234  uint32_t num_complete = 0;
235  auto start_time = std::chrono::system_clock::now();
236 
237  while (!s_terminate) {
238  //This is the main control loop of the server. DVS::IServer::update() just
239  //needs to be called in a loop to handle the server state until the server
240  //should be terminated.
241  //Note: update() will flush data to the cache so shutting down / exiting mid
242  //update will likely cause cache corruption.
243  dvs_ret ret_val = s_server->update();
244  // Avoid burning up a CPU waiting for the I/O to complete
245  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
246 
247  uint32_t cur_num_pending = 0;
248  uint32_t cur_num_complete = 0;
249  s_server->get_timestep_count(cur_num_pending, cur_num_complete);
250  if (cur_num_pending == num_pending && cur_num_complete == num_complete) {
251  //If our values haven't changed check to see if we should time out
252  std::chrono::duration<double> elapsed_seconds = std::chrono::system_clock::now() - start_time;
253  if (elapsed_seconds.count() > timeout_sec) {
254  //Terminate the server and quit out
255  s_server->terminating();
256  s_terminate = true;
257  }
258  }
259  else {
260  //We are different so reset our counters
261  start_time = std::chrono::system_clock::now();
262  num_pending = cur_num_pending;
263  num_complete = cur_num_complete;
264  }
265  }
266 
267  num_pending = 1;
268  num_complete = 0;
269  while (num_pending > 0) {
270  //This loop may or may not need to be done by a application. But is here
271  //to show potential cleanup when terminating early. If terminating you may
272  //want to finish the current in flight timestep. Since the server has been
273  //told it is terminating in the signal handler above it will not accept new
274  //timesteps but will allow pending timesteps to still complete.
275 
276  //Note: A user probably will want to do more due dilligence than this when
277  //shutting down. A timestep that is waiting on a client that is hung or crashed
278  //will never flip from pending->complete currently. This could cause this loop
279  //to be infinite waiting on a client who will never send its data.
280  dvs_ret error = s_server->update();
281  if (error != DVS_NONE) {
282  break;
283  }
284  //If the update() call above isn't called in this loop num_pending will
285  //never change also calling this loop to be infinite. We must update the server
286  //to get timestep state changes.
287  error = s_server->get_timestep_count(num_pending, num_complete);
288  if (error != DVS_NONE) {
289  break;
290  }
291  // Avoid burning up a CPU waiting for the I/O to complete
292  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
293  }
294 
295  //Destroy the server on exit, guarding against the s_server destruction here
296  //in case the signal handler above is triggered.
297  std::lock_guard<std::mutex> lock(s_server_mutex);
298  DVS::DESTROY_SERVER_INSTANCE(s_server);
299  s_server = nullptr;
300 
301  return 0;
302 }
Interface class used to run a dynamic data server in a thread accepting incoming client connections.
virtual void set_option(const char *key, const char *value)=0
Set a specific option on the server, these are used during startup See See Server Options.
virtual dvs_ret terminating()=0
Call to begin terminating the server.
virtual dvs_ret get_timestep_count(uint32_t &num_pending, uint32_t &num_complete) const =0
return the current number of pending and complete timesteps in the server
virtual dvs_ret startup_unthreaded(uint32_t server_number, uint32_t local_ranks)=0
Startup a server manually without threads.
virtual dvs_ret update()=0
Perform a server update.
C++ Server API for using Dynamic Visualization Store Server.
Contains enums used in C/C++ API.
int32_t dvs_ret
Return value of methods, TODO.
#define DVS_NONE
No detected error has occurred.
int main(int argc, char **argv)
Main method of test client application.
void signal_callback_handler(int signum)
Simple signal handler to handle signal.