nds2-client - User 0.16.7
Loading...
Searching...
No Matches
nds_connection_ptype.hh
1#ifndef NDS_CONNECTION_DETAIL_PTYPE_HH
2#define NDS_CONNECTION_DETAIL_PTYPE_HH
3
4#include <cerrno>
5#include <map>
6#include <memory>
7#include <sstream>
8#include <vector>
9
10#include "debug_stream.hh"
11
12#include "daq_config.h"
13#include "daqc.h"
14#include "daqc_internal.h"
15#include "daqc_response.h"
16
17#include "nds_channel.hh"
18#include "nds_buffer.hh"
19
20#include "nds_gap_handler.hh"
21#include "nds_request_fragment.hh"
22#include "nds_iterate_handler.hh"
23
24#include "nds_parameter_block.hh"
25
26#include "nds_channel_cache.hh"
27
28namespace NDS
29{
30 namespace detail
31 {
32
37 class basic_channel_filter
38 {
39 public:
40 basic_channel_filter( channel::data_type data_type_mask,
41 channel::sample_rate_type min_sample_rate,
42 channel::sample_rate_type max_sample_rate )
43 : data_type_mask_( data_type_mask ),
44 min_sample_rate_( min_sample_rate ),
45 max_sample_rate_( max_sample_rate )
46 {
47 }
48 bool
49 operator( )( const channel& ch )
50 {
51 return (
52 ( ( ( ch.DataType( ) & data_type_mask_ ) != 0 ) ||
53 data_type_mask_ == NDS::channel::DEFAULT_DATA_MASK ) &&
54 ( ch.SampleRate( ) <= max_sample_rate_ ) &&
55 ( ch.SampleRate( ) >= min_sample_rate_ ) );
56 }
57
58 private:
59 channel::data_type data_type_mask_;
60 channel::sample_rate_type min_sample_rate_;
61 channel::sample_rate_type max_sample_rate_;
62 };
63
67 class push_back_channel
68 {
69 public:
70 push_back_channel( std::vector< channel >& buffer )
71 : buffer_( buffer )
72 {
73 }
74 void
75 operator( )( const channel& ch )
76 {
77 buffer_.push_back( ch );
78 }
79
80 private:
81 std::vector< channel >& buffer_;
82 };
83
87 class count_channels
88 {
89 public:
90 count_channels( ) : count_( 0 )
91 {
92 }
93 void
94 operator( )( const channel& ch )
95 {
96 ++count_;
97 }
98
99 size_t
100 count( ) const
101 {
102 return count_;
103 }
104
105 private:
106 size_t count_;
107 };
108 }
109
110 namespace detail
111 {
112
113 class buffer_initializer
114 {
115 public:
116 DLL_EXPORT
117 buffer_initializer( buffer::gps_second_type gps_start,
118 buffer::gps_second_type gps_stop )
119 : gps_start( gps_start ), gps_stop( gps_stop ){};
120
121 DLL_EXPORT
122 void reset_buffer( buffer* cur_buffer,
123 const channel& channel_info ) const;
124
125 private:
126 buffer::gps_second_type gps_start;
127 buffer::gps_second_type gps_stop;
128 };
129
130 class daq_accessor
131 {
132 public:
133 daq_accessor( daq_t& handle ) : handle_( handle ){};
134
135 daq_t*
136 operator( )( )
137 {
138 return &handle_;
139 };
140
141 private:
142 daq_t& handle_;
143 };
144
145 //---------------------------------------------------------------------
146 // conn_p_type
147 //---------------------------------------------------------------------
148 struct conn_p_type : public std::enable_shared_from_this< conn_p_type >
149 {
150 enum class iterate_finalize_reason
151 {
152 FINISHED,
153 ABORTED
154 };
155
156 typedef channel::channel_type channel_type;
157 typedef channel::data_type data_type;
158 typedef channel::sample_rate_type sample_rate_type;
159
160 typedef std::map< std::string, daq_channel_t >
161 channel_mem_cache_type;
162 typedef NDS::detail::channel_cache_nds1 channel_cache_type;
163 typedef long time_type;
164
165 typedef nds_socket_type socket_t;
166
167 connection::host_type host;
168 connection::port_type port;
169 connection::protocol_type protocol;
170 daq_t handle;
171 daq_accessor accesor_;
172 bool connected;
173 channel_cache_type channel_cache_;
174 channel_mem_cache_type channel_mem_cache_;
175
176 time_type request_start_time_;
177 time_type request_end_time_;
178 bool request_in_progress_;
179
180 NDS::parameters parameters_;
181
182 DLL_EXPORT
183 explicit conn_p_type( const NDS::parameters& params );
184
185 DLL_EXPORT
186 ~conn_p_type( );
187
188 void connect( );
189
190 availability_list_type get_availability(
191 const epoch& time_span,
192 const connection::channel_names_type& channel_names );
193 availability_list_type get_availability(
194 buffer::gps_second_type gps_start,
195 buffer::gps_second_type gps_stop,
196 const connection::channel_names_type& channel_names );
197
198 bool check( buffer::gps_second_type gps_start,
199 buffer::gps_second_type gps_stop,
200 const connection::channel_names_type& channel_names );
201
202 bool
203 has_gaps( buffer::gps_second_type gps_start,
204 buffer::gps_second_type gps_stop,
205 const connection::channel_names_type& channel_names );
206
207 buffers_type
208 fetch( buffer::gps_second_type gps_start,
209 buffer::gps_second_type gps_stop,
210 const connection::channel_names_type& channel_names,
211 channels_type* reference_channels = nullptr );
212
213 size_t count_channels_nds1(
214 std::string channel_glob,
215 channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
216 data_type data_type_mask = channel::DEFAULT_DATA_MASK,
217 sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
218 sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE );
219
220 size_t count_channels_nds2(
221 std::string channel_glob,
222 channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
223 data_type data_type_mask = channel::DEFAULT_DATA_MASK,
224 sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
225 sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE );
226
227 size_t
228 count_channels(
229 std::string channel_glob,
230 channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
231 data_type data_type_mask = channel::DEFAULT_DATA_MASK,
232 sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
233 sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE )
234 {
236 {
237 return count_channels_nds1( channel_glob,
238 channel_type_mask,
239 data_type_mask,
240 min_sample_rate,
241 max_sample_rate );
242 }
243 return count_channels_nds2( channel_glob,
244 channel_type_mask,
245 data_type_mask,
246 min_sample_rate,
247 max_sample_rate );
248 }
249
250 void find_channels( channels_type& output,
251 const NDS::channel_predicate_object& pred );
252
253 void find_channels_nds1(
254 channels_type& output,
255 std::string channel_glob,
256 channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
257 data_type data_type_mask = channel::DEFAULT_DATA_MASK,
258 sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
259 sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE );
260
261 void find_channels_nds2(
262 channels_type& output,
263 std::string channel_glob,
264 channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
265 data_type data_type_mask = channel::DEFAULT_DATA_MASK,
266 sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
267 sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE );
268
269 epochs_type get_epochs( );
270
271 bool set_epoch_if_changed( const epoch& time_span );
272 bool set_epoch( const std::string& epoch_name );
273
274 bool set_epoch( buffer::gps_second_type gps_start,
275 buffer::gps_second_type gps_stop );
276
277 epoch current_epoch( ) const;
278
279 const channel::hash_type& hash( ) const;
280
281 std::shared_ptr< detail::iterate_handler > dispatch_iterate(
282 buffer::gps_second_type gps_start,
283 buffer::gps_second_type gps_stop,
284 buffer::gps_second_type stride,
285 const connection::channel_names_type& channel_names );
286
287 void issue_iterate(
288 buffer::gps_second_type gps_start,
289 buffer::gps_second_type gps_stop,
290 buffer::gps_second_type stride,
291 const connection::channel_names_type& channel_names,
292 std::vector< NDS::channel >* final_channel_list = nullptr );
293
294 void finalize_iterate( detail::iterate_handler* handler,
295 iterate_finalize_reason reason );
296
297 // bool has_next();
298 //
299 // buffers_type next();
300
301 void next_raw_buffer( buffers_type& output );
302
303 void shutdown( );
304
305 void fill_gap( data_type DataType,
306 channel::size_type DataSizeType,
307 unsigned char* start,
308 unsigned char* end );
309
310 inline time_type
311 request_start_time( ) const
312 {
313 return request_start_time_;
314 }
315
316 inline void
317 request_start_time( time_type Value )
318 {
319 request_start_time_ = Value;
320 }
321
322 inline time_type
323 request_end_time( ) const
324 {
325 return request_end_time_;
326 }
327
328 inline void
329 request_end_time( time_type Value )
330 {
331 request_end_time_ = Value;
332 }
333
334 inline bool
335 request_in_progress( ) const
336 {
337 return request_in_progress_;
338 }
339
340 inline void
341 request_in_progress( bool Value )
342 {
343 request_in_progress_ = Value;
344 }
345
354 calculate_stride( NDS::buffer::gps_second_type gps_start,
355 NDS::buffer::gps_second_type gps_stop,
356 NDS::channels_type& selected_channels ) const;
357
358 inline void
359 termination_block( )
360 {
361 //-----------------------------------------------------------------
362 // NDS1 transfers end with a 'termination block', an empty block
363 // that is indistinguisable from a 'data not found' condition.
364 // If this is an NDS1 connection, we must digest the termination
365 // block.
366 //-----------------------------------------------------------------
367 if ( ( request_in_progress( ) ) &&
369 {
370 int rc = daq_recv_next( &handle );
371
372 if ( rc != DAQD_NOT_FOUND )
373 {
374 //-----------------------------------------------------
375 // Unexpected error
376 //-----------------------------------------------------
377 throw connection::daq_error( rc );
378 }
379 }
380 }
381
386 NDS::buffer::gps_second_type cur_nds1_gpstime( );
387
388 void validate( ) const;
389
390 void validate_daq( int RetCode ) const;
391
392 void infer_daq_channel_info( const std::string& channel_name,
393 daq_channel_t& channel,
394 time_type gps );
395
396 void channel_mask_to_query_type_strings(
397 channel_type channel_type_mask,
398 std::vector< std::string >& queryTypes );
399
400 // Helper functions for error messages
401 std::string get_last_message( ) const throw( );
402
403 std::string err_msg_unexpected_no_data_found(
404 buffer::gps_second_type gps_start,
405 buffer::gps_second_type gps_stop,
406 const channel::channel_names_type& names );
407
408 void
409 plan_fetches( buffer::gps_second_type gps_start,
410 buffer::gps_second_type gps_stop,
411 const connection::channel_names_type& channel_names,
412 buffers_type& dest_buffers,
413 request_fragments_type& retval );
414
415 void
416 sync_parameters( )
417 {
418 detail::parameter_accessor paccess( parameters_ );
419 if ( handle.conceal )
420 {
421 handle.conceal->max_command_count = static_cast< size_t >(
422 paccess( ).max_nds1_command_count( ) );
423 }
424 }
425
426 void cycle_nds1_connection( );
427
428 protected:
429 epoch resolve_epoch( const std::string& epoch_name );
430 void setup_daq_chanlist(
431 buffer::gps_second_type gps_start,
432 const connection::channel_names_type& channel_names,
433 bool& have_minute_trends,
434 double& bytes_per_sample,
435 std::vector< NDS::channel >* final_channel_list = nullptr );
436
437 void process_check_data_result( int result, bool gaps_ok );
438
439 void fetch_fragment( request_fragment& fragment,
440 const buffer_initializer& initializer,
441 bool buffers_initialized );
442
443 std::vector< buffers_type > fetch_available(
444 buffer::gps_second_type gps_start,
445 buffer::gps_second_type gps_stop,
446 const connection::channel_names_type& channel_names );
447
448 std::weak_ptr< detail::iterate_handler > iterate_handler_;
449
450 epochs_type epoch_cache_;
451 epoch current_epoch_;
452 channel::hash_type hash_;
453
454 void load_epochs_to_cache( );
455
456 channel _parse_nds2_get_channel_line( char* buffer );
457
458 bool _read_uint4( socket_t fd, uint4_type* dest );
459
460 bool _read_buffer( socket_t fd, void* dest, size_t size );
461
462 template < typename Filter, typename Function >
463 void retreive_channels_from_nds2(
464 const std::vector< std::string >& types,
465 const std::string& channel_glob,
466 Filter& filt,
467 Function& fn );
468
469 std::shared_ptr< detail::iterate_handler > iterate_simple_gaps(
470 buffer::gps_second_type gps_start,
471 buffer::gps_second_type gps_stop,
472 buffer::gps_second_type stride,
473 const connection::channel_names_type& channel_names );
474
475 std::shared_ptr< detail::iterate_handler >
476 iterate_fast( buffer::gps_second_type gps_start,
477 buffer::gps_second_type gps_stop,
478 buffer::gps_second_type stride,
479 const connection::channel_names_type& channel_names );
480
481 std::shared_ptr< detail::iterate_handler > iterate_available(
482 buffer::gps_second_type gps_start,
483 buffer::gps_second_type gps_stop,
484 buffer::gps_second_type stride,
485 const connection::channel_names_type& channel_names );
486
487 std::shared_ptr< detail::iterate_handler >
488 iterate_full( buffer::gps_second_type gps_start,
489 buffer::gps_second_type gps_stop,
490 buffer::gps_second_type stride,
491 const connection::channel_names_type& channel_names );
492
493 private:
494 static bool initialized;
495 };
496
497 template < typename Filter, typename Function >
498 void
499 conn_p_type::retreive_channels_from_nds2(
500 const std::vector< std::string >& types,
501 const std::string& channel_glob,
502 Filter& filter,
503 Function& fn )
504 {
505 std::vector< char > lineBuffer;
506 lineBuffer.resize( 512 );
507
508 for ( std::vector< std::string >::const_iterator cur =
509 types.begin( );
510 cur != types.end( );
511 ++cur )
512 {
513 NDS::detail::dout( ) << "Retreiving channels for type " << *cur
514 << std::endl;
515 {
516 std::ostringstream cmdBuf;
517 // cmdBuf << "` " << current_epoch_.gps_start << " " <<
518 // cur_type;
519 if ( current_epoch_.gps_stop ==
520 current_epoch_.gps_start + 1 )
521 {
522 cmdBuf << "get-channels " << current_epoch_.gps_start
523 << " ";
524 }
525 else
526 {
527 cmdBuf << "get-channels 0 ";
528 }
529 cmdBuf << *cur;
530 if ( channel_glob.compare( "*" ) != 0 )
531 {
532 cmdBuf << " {" << channel_glob << "}";
533 }
534 cmdBuf << ";\n";
535 validate_daq(
536 daq_send( &( handle ), cmdBuf.str( ).c_str( ) ) );
537 NDS::detail::dout( ) << "Sent command '" << cmdBuf.str( )
538 << "'" << std::endl;
539 }
540
541 socket_t fd = handle.conceal->sockfd;
542 uint4_type count = 0;
543 NDS::detail::dout( ) << "Reading channel count" << std::endl;
544 if ( !_read_uint4( fd, &count ) )
545 {
546 throw connection::unexpected_channels_received_error( );
547 }
548 // result.reserve(count);
549
550 NDS::detail::dout( ) << "Expecting " << count << "channels"
551 << std::endl;
552 for ( uint4_type i = 0; i < count; ++i )
553 {
554 uint4_type line_size = 0;
555 if ( !_read_uint4( handle.conceal->sockfd, &line_size ) )
556 {
557 throw connection::unexpected_channels_received_error( );
558 }
559 if ( line_size < lineBuffer.size( ) )
560 {
561 lineBuffer.resize( line_size );
562 }
563 if ( !_read_buffer( handle.conceal->sockfd,
564 &( lineBuffer[ 0 ] ),
565 line_size ) )
566 {
567 throw connection::unexpected_channels_received_error( );
568 }
569 channel curCh =
570 _parse_nds2_get_channel_line( &( lineBuffer[ 0 ] ) );
571 if ( filter( curCh ) )
572 {
573 fn( curCh );
574 }
575 }
576 }
577 }
578 } // namespace detail
579
580} // namespace NDS
581
582#endif // NDS_CONNECTION_DETAIL_PTYPE_HH
long gps_second_type
Type second portion of a gps time.
Definition nds_buffer.hh:33
A predicate object to be used in limiting the number of channels returned when searching for channels...
Definition nds_connection.hh:169
@ PROTOCOL_ONE
Connect with NDS1 protocol.
Definition nds_connection.hh:607
The parameters class holds the configuration for a connection. This includes gap handling strategy an...
Definition nds_connection.hh:930
The NDS client namespace.
Definition debug_stream.cc:18
channels_type find_channels(const NDS::parameters &params, const channel_predicate_object &pred)
Retrieve a list of channels without having to manage a connection.
Definition nds_standalone.cc:48
NDS::buffers_type fetch(const NDS::parameters &params, buffer::gps_second_type gps_start, buffer::gps_second_type gps_stop, const NDS::connection::channel_names_type &channel_names)
Retreive data from the server without having to manage a connection.
Definition nds_standalone.cc:12