nds2-client - User 0.16.7
Loading...
Searching...
No Matches
nds_iterate_handlers.hh
1//
2// Created by jonathan.hanks on 5/5/17.
3//
4
5#ifndef NDS2_CLIENT_NDS_ITERATE_HANDLERS_HH
6#define NDS2_CLIENT_NDS_ITERATE_HANDLERS_HH
7
8#include <algorithm>
9#include <limits>
10#include <memory>
11
12#include "nds_iterate_handler.hh"
13
14namespace NDS
15{
16 namespace detail
17 {
27 class iterate_fast_handler : public detail::iterate_handler
28 {
29 private:
30 buffer::gps_second_type cur_gps_;
31 buffer::gps_second_type gps_start_;
32 buffer::gps_second_type gps_stop_;
33 buffer::gps_second_type stride_;
34 bool online_;
35 buffers_type next_entry_;
36
37 template < typename T >
38 T
39 safe_add( T val1, T val2 )
40 {
41 static const T max_val = std::numeric_limits< T >::max( );
42 if ( val2 >= max_val - val1 )
43 {
44 return max_val;
45 }
46 return val1 + val2;
47 }
48
49 void
50 get_next_block( buffers_type& output )
51 {
52 auto parent = conn( );
53 if ( !parent )
54 {
55 throw std::runtime_error( "Connection object is null" );
56 }
57 parent->next_raw_buffer( output );
58 if ( gps_start_ == 0 )
59 {
60 gps_start_ = output.front( ).Start( );
61 gps_stop_ = safe_add< NDS::buffer::gps_second_type >(
62 gps_start_,
63 ( gps_stop_ == 0 ? 1893024016 : gps_stop_ ) );
64 parent->request_end_time( gps_stop_ );
65 }
66 if ( !output.empty( ) )
67 {
68 cur_gps_ = output.front( ).Stop( );
69 }
70 if ( cur_gps_ >= gps_stop_ && online_ )
71 {
72 parent->cycle_nds1_connection( );
73 }
74 }
75
76 public:
77 iterate_fast_handler(
78 buffer::gps_second_type gps_start,
79 buffer::gps_second_type gps_stop,
80 buffer::gps_second_type stride,
81 const connection::channel_names_type& channel_names,
82 std::shared_ptr< NDS::detail::conn_p_type >&& parent )
83 : iterate_handler( std::move( parent ) ), cur_gps_( 0 ),
84 gps_start_( gps_start ), gps_stop_( gps_stop ),
85 stride_( stride ), online_( gps_start == 0 )
86 {
87 buffer::gps_second_type stop_time =
88 ( parent->protocol == NDS::connection::PROTOCOL_ONE &&
89 gps_start == 0
90 ? 0
91 : gps_stop_ );
92 parent->issue_iterate(
93 gps_start_, stop_time, stride, channel_names );
94 }
95 ~iterate_fast_handler( ) override = default;
96
97 bool
98 has_next( ) override
99 {
100 if ( next_entry_.size( ) > 0 )
101 {
102 return true;
103 }
104 if ( cur_gps_ >= gps_stop_ && gps_start_ != 0 )
105 {
106 return false;
107 }
108 try
109 {
110 get_next_block( next_entry_ );
111 }
112 catch ( ... )
113 {
114 return false;
115 }
116 return true;
117 }
118
119 void
120 next( buffers_type& output ) override
121 {
122 while ( has_next( ) )
123 {
124 if ( next_entry_.size( ) > 0 )
125 {
126 output.swap( next_entry_ );
127 next_entry_.clear( );
128 return;
129 }
130 get_next_block( next_entry_ );
131 }
132 }
133 };
134
145 class iterate_handler_with_simple_gaps : public detail::iterate_handler
146 {
147 typedef std::vector< NDS::channel >::iterator ch_vec_iter;
148 typedef std::vector< NDS::channel >::const_iterator
149 ch_vec_const_iter;
150
159 struct indexed_buffers
160 {
161
162 buffers_type bufs_;
163 buffer::gps_second_type cur_;
164 buffer::gps_second_type stride_;
165 std::vector<
166 std::unique_ptr< NDS::detail::delayed_gap_handler > >
167 delay_;
168
169 indexed_buffers( )
170 : bufs_( ), cur_( 0 ), stride_( 0 ), delay_( )
171 {
172 }
173
174 void
175 initialize( const std::vector< NDS::channel >& channel_list,
176 buffer::gps_second_type gps_time,
177 buffer::gps_second_type stride )
178 {
179 if ( bufs_.size( ) != channel_list.size( ) )
180 {
181 bufs_.resize( channel_list.size( ) );
182 }
183
184 for ( int i = 0; i < channel_list.size( ); ++i )
185 {
186 const NDS::channel& cur_ch = channel_list[ i ];
187 NDS::buffer& cur_buf = bufs_[ i ];
188
189 cur_buf.reset_channel_info( cur_ch, gps_time, 0 );
190 if ( cur_ch.Type( ) &
191 NDS::channel::CHANNEL_TYPE_MTREND )
192 {
193 cur_buf.resize(
194 stride /
195 static_cast< buffer::size_type >( 60 ) );
196 }
197 else
198 {
199 cur_buf.resize( stride *
200 static_cast< buffer::size_type >(
201 cur_ch.SampleRate( ) ) );
202 }
203 }
204 cur_ = gps_time;
205 stride_ = stride;
206 delay_.clear( );
207 }
208
215 bool
216 valid( ) const
217 {
218 if ( bufs_.size( ) == 0 )
219 {
220 return false;
221 }
222 if ( cur_ >= bufs_.front( ).Start( ) + stride_ )
223 {
224 return false;
225 }
226 return true;
227 }
228
235 void
236 reset( buffers_type& other )
237 {
238 bufs_.swap( other );
239 cur_ = 0;
240 stride_ = 0;
241 if ( bufs_.size( ) > 0 )
242 {
243 cur_ = bufs_.front( ).Start( );
244 stride_ = bufs_.front( ).Stop( ) - cur_;
245 }
246 delay_.clear( );
247 }
248
253 buffers_type&
254 bufs( )
255 {
256 return bufs_;
257 }
258
263 buffer::gps_second_type
264 cur( ) const
265 {
266 return cur_;
267 }
268
273 buffer::gps_second_type
274 start( ) const
275 {
276 if ( bufs_.size( ) == 0 )
277 {
278 return 0;
279 }
280 return bufs_.front( ).Start( );
281 }
282
289 buffer::gps_second_type
290 remaining( ) const
291 {
292 if ( bufs_.size( ) == 0 )
293 {
294 return 0;
295 }
296 return start( ) + stride_ - cur_;
297 }
298
303 void
304 advance( buffer::gps_second_type delta )
305 {
306 cur_ += delta;
307 }
308
321 buffer::gps_second_type
322 append_data_from( indexed_buffers& other )
323 {
324 if ( other.bufs_.size( ) != bufs_.size( ) )
325 {
326 return 0;
327 }
328 if ( other.cur( ) != cur( ) )
329 {
330 return 0;
331 }
332 buffer::gps_second_type secs_appended =
333 std::min< buffer::gps_second_type >(
334 remaining( ), other.remaining( ) );
335
336 buffer::gps_second_type src_offset_sec =
337 other.cur( ) - other.start( );
338 buffer::gps_second_type dest_offset_sec = cur( ) - start( );
339
340 for ( buffers_type::size_type i = 0; i < bufs_.size( );
341 ++i )
342 {
343 NDS::buffer& src_buf = other.bufs_[ i ];
344 NDS::buffer& dest_buf = bufs_[ i ];
345
346 auto src = const_cast< char* >(
347 reinterpret_cast< const char* >(
348 src_buf.cbegin< void >( ) ) );
349 src += src_buf.samples_to_bytes(
350 src_buf.seconds_to_samples( src_offset_sec, 0 ) );
351 auto dest = const_cast< char* >(
352 reinterpret_cast< const char* >(
353 dest_buf.cbegin< void >( ) ) );
354 dest += dest_buf.samples_to_bytes(
355 dest_buf.seconds_to_samples( dest_offset_sec, 0 ) );
356
357 buffer::size_type copy_bytes = src_buf.samples_to_bytes(
358 src_buf.seconds_to_samples( secs_appended, 0 ) );
359 std::copy( src, src + copy_bytes, dest );
360 }
361 advance( secs_appended );
362 other.advance( secs_appended );
363 return secs_appended;
364 }
365
373 void
374 apply_gap_handler( detail::gap_handler& handler,
375 buffer::gps_second_type gps_stop )
376 {
377 if ( gps_stop <= cur_ )
378 {
379 return;
380 }
381 buffer::gps_second_type start_time = start( );
382 if ( gps_stop > start_time + stride_ )
383 {
384 gps_stop = start_time + stride_;
385 }
386
387 for ( buffers_type::iterator cur_buf = bufs_.begin( );
388 cur_buf != bufs_.end( );
389 ++cur_buf )
390 {
391 std::unique_ptr< detail::delayed_gap_handler >
392 delayed_handler( handler.fill_gap(
393 *cur_buf,
394 cur_buf->seconds_to_samples( cur_ -
395 start_time ),
396 cur_buf->seconds_to_samples( gps_stop -
397 start_time ) ) );
398 if ( delayed_handler )
399 {
400 delay_.push_back( std::move( delayed_handler ) );
401 }
402 }
403 advance( gps_stop - cur_ );
404 }
405
406 void
407 apply_delayed_handlers( )
408 {
409 for ( std::vector< std::unique_ptr<
410 detail::delayed_gap_handler > >::iterator cur =
411 delay_.begin( );
412 cur != delay_.end( );
413 ++cur )
414 {
415 if ( *cur )
416 {
417 ( **cur )( );
418 }
419 }
420 delay_.clear( );
421 }
422 };
423
424 buffer::gps_second_type cur_gps_;
425 buffer::gps_second_type gps_start_;
426 buffer::gps_second_type gps_stop_;
427 buffer::gps_second_type stride_;
428 std::unique_ptr< detail::gap_handler > gap_handler_;
429 std::vector< NDS::channel > channel_list_;
430
431 indexed_buffers current_buffer_;
432 indexed_buffers pending_buffer_;
433
434 bool is_more_data_available_;
435
436 public:
437 iterate_handler_with_simple_gaps(
438 buffer::gps_second_type gps_start,
439 buffer::gps_second_type gps_stop,
440 buffer::gps_second_type stride,
441 const connection::channel_names_type& channel_names,
442 std::shared_ptr< NDS::detail::conn_p_type >&& parent,
443 std::unique_ptr< detail::gap_handler > ghandler )
444 : iterate_handler( std::move( parent ) ), cur_gps_( gps_start ),
445 gps_start_( gps_start ), gps_stop_( gps_stop ),
446 stride_( stride ), gap_handler_( std::move( ghandler ) ),
447 channel_list_( ), current_buffer_( ), pending_buffer_( ),
448 is_more_data_available_( true )
449 {
450 channel_list_.reserve( channel_names.size( ) );
451 parent->issue_iterate( gps_start_,
452 gps_stop_,
453 stride_,
454 channel_names,
455 &channel_list_ );
456 if ( stride_ == 0 )
457 {
458 stride_ = parent->calculate_stride(
459 gps_start_, gps_stop_, channel_list_ );
460 }
461 current_buffer_.initialize( channel_list_, cur_gps_, stride_ );
462 }
463 ~iterate_handler_with_simple_gaps( ) override = default;
464
465 bool
466 has_next( ) override
467 {
468 return conn( ) && ( cur_gps_ < gps_stop_ );
469 }
470
471 void
472 next( buffers_type& output ) override
473 {
474 auto parent = conn( );
475 if ( !has_next( ) || !parent )
476 {
477 throw std::out_of_range( "No Next" );
478 }
479
480 buffer::gps_second_type segment_end = cur_gps_ + stride_;
481 if ( current_buffer_.cur( ) > segment_end )
482 {
483 throw std::runtime_error(
484 "Impossible condition triggered, gap "
485 "handled iterator went beyond "
486 "bounds" );
487 }
488 while ( current_buffer_.cur( ) < segment_end )
489 {
490 /* loop doing one of:
491 * 1. take from the current pending_buffer_ if it is
492 * adjacent
493 * and valid
494 * 2. if pending is valid gap fill up to the end of the
495 * segment
496 * or the pending
497 * buffer, which ever is fist
498 * 3. if pending is not valid and there is more data fill
499 * pending
500 * 4. gap fill the remainder of the segment
501 */
502
503 if ( pending_buffer_.valid( ) )
504 {
505 if ( current_buffer_.cur( ) == pending_buffer_.cur( ) )
506 {
507 current_buffer_.append_data_from( pending_buffer_ );
508 }
509 else if ( current_buffer_.cur( ) <
510 pending_buffer_.cur( ) )
511 {
512 current_buffer_.apply_gap_handler(
513 *gap_handler_,
514 std::min( pending_buffer_.cur( ),
515 segment_end ) );
516 }
517 }
518 else if ( is_more_data_available_ )
519 {
520 try
521 {
522 buffers_type tmp;
523 parent->next_raw_buffer( tmp );
524 pending_buffer_.reset( tmp );
525 if ( pending_buffer_.bufs( ).size( ) == 0 )
526 {
527 is_more_data_available_ = false;
528 }
529 }
530 catch ( ... )
531 {
532 is_more_data_available_ = false;
533 }
534 }
535 else
536 {
537 current_buffer_.apply_gap_handler( *gap_handler_,
538 segment_end );
539 }
540 }
541 current_buffer_.apply_delayed_handlers( );
542 output.swap( current_buffer_.bufs( ) );
543 cur_gps_ += stride_;
544 if ( cur_gps_ + stride_ > gps_stop_ )
545 {
546 stride_ = gps_stop_ - cur_gps_;
547 }
548 current_buffer_.initialize( channel_list_, cur_gps_, stride_ );
549 }
550 };
551
561 class iterate_available_handler : public detail::iterate_handler
562 {
563 detail::request_fragments_type fragment_list_;
564 buffer::size_type cur_segment_;
565 buffer::gps_second_type cur_gps_;
566 buffer::gps_second_type gps_start_;
567 buffer::gps_second_type gps_stop_;
568 buffer::gps_second_type max_stride_;
569 channel::channel_names_type names_;
570 buffers_type next_entry_;
571
572 void
573 setup_next_step( NDS::detail::conn_p_type& parent )
574 {
575 NDS::detail::dout( ) << "setup_next_iterate_step()"
576 << std::endl;
577 if ( cur_gps_ == 0 )
578 {
579 NDS::detail::dout( ) << "Finding first segment"
580 << std::endl;
581 cur_segment_ = 0;
582 cur_gps_ = gps_start_;
583 }
584
585 if ( fragment_list_[ 0 ].time_spans.empty( ) )
586 {
587 cur_segment_++;
588 return;
589 }
590 simple_segment_list_type::value_type cur_segment =
591 ( fragment_list_[ 0 ].time_spans )[ cur_segment_ ];
592
593 NDS::detail::dout( ) << "status " << cur_gps_ << " "
594 << cur_segment.gps_start << "-"
595 << cur_segment.gps_stop << std::endl;
596 if ( cur_gps_ <= cur_segment.gps_start )
597 {
598 NDS::detail::dout( ) << "Starting a segment " << std::endl;
599 buffer::gps_second_type delta =
600 cur_segment.gps_stop - cur_segment.gps_start;
601 buffer::gps_second_type stride =
602 ( max_stride_ > delta ? delta : max_stride_ );
603 NDS::detail::dout( ) << cur_segment.gps_start << " "
604 << cur_segment.gps_stop << " "
605 << stride << std::endl;
606 parent.issue_iterate( cur_segment.gps_start,
607 cur_segment.gps_stop,
608 stride,
609 names_ );
610 }
611 else if ( cur_gps_ == cur_segment.gps_stop )
612 {
613 NDS::detail::dout( ) << "Ending a segment " << std::endl;
614 cur_segment_++;
615 if ( cur_segment_ >=
616 fragment_list_[ 0 ].time_spans.size( ) )
617 {
618 return;
619 }
620 simple_segment_list_type::value_type new_segment =
621 ( fragment_list_[ 0 ].time_spans )[ cur_segment_ ];
622 cur_gps_ = new_segment.gps_start;
623 buffer::gps_second_type delta =
624 new_segment.gps_stop - new_segment.gps_start;
625 buffer::gps_second_type stride =
626 ( max_stride_ > delta ? delta : max_stride_ );
627 NDS::detail::dout( ) << new_segment.gps_start << " "
628 << new_segment.gps_stop << " "
629 << stride;
630 parent.issue_iterate( new_segment.gps_start,
631 new_segment.gps_stop,
632 stride,
633 names_ );
634 }
635 else
636 {
637 NDS::detail::dout( ) << "Mid segment" << std::endl;
638 }
639 }
640
641 public:
642 iterate_available_handler(
643 buffer::gps_second_type gps_start,
644 buffer::gps_second_type gps_stop,
645 buffer::gps_second_type stride,
646 const channel::channel_names_type& channel_names,
647 std::shared_ptr< NDS::detail::conn_p_type >&& parent )
648 : iterate_handler( std::move( parent ) ), fragment_list_( ),
649 cur_segment_( 0 ), cur_gps_( 0 ), gps_start_( gps_start ),
650 gps_stop_( gps_stop ), max_stride_( stride ),
651 names_( channel_names )
652 {
653 if ( parent->protocol == connection::PROTOCOL_ONE ||
654 gps_start == 0 )
655 {
656 NDS::detail::dout( ) << "Fast path" << std::endl;
657 // NDS1 and online data do not get special treatement
658 parent->issue_iterate(
659 gps_start, gps_stop, stride, channel_names );
660 return;
661 }
662 NDS::detail::dout( ) << "Planning fetches" << std::endl;
663
664 buffers_type retval;
665 retval.resize( channel_names.size( ) );
666 parent->plan_fetches(
667 gps_start_, gps_stop_, names_, retval, fragment_list_ );
668
669 // do not know how to deal with gaps that do not line up
670 if ( fragment_list_.size( ) != 1 )
671 {
672 throw connection::daq_error(
673 DAQD_NOT_FOUND,
674 "The requested channels have "
675 "different/non-identical gaps." );
676 }
677
678 setup_next_step( *( parent.get( ) ) );
679 };
680
681 ~iterate_available_handler( ) override = default;
682
683 bool
684 has_next( ) override
685 {
686 if ( next_entry_.size( ) > 0 )
687 {
688 return true;
689 }
690 auto parent = conn( );
691 if ( cur_gps_ >= gps_stop_ || !parent )
692 {
693 return false;
694 }
695 try
696 {
697 parent->next_raw_buffer( next_entry_ );
698 if ( next_entry_.size( ) == 0 )
699 {
700 return false;
701 }
702 cur_gps_ = next_entry_[ 0 ].Stop( );
703 setup_next_step( *parent );
704 }
705 catch ( ... )
706 {
707 return false;
708 }
709 return true;
710 }
711
712 void
713 next( buffers_type& output ) override
714 {
715 buffers_type retval;
716
717 if ( next_entry_.size( ) > 0 )
718 {
719 output.swap( next_entry_ );
720 next_entry_.clear( );
721 return;
722 }
723 auto parent = conn( );
724 if ( !parent )
725 {
726 throw( std::out_of_range( "No Next" ) );
727 }
728 parent->next_raw_buffer( retval );
729 if ( retval.empty( ) )
730 {
731 throw( std::out_of_range( "No Next" ) );
732 }
733 else
734 {
735 cur_gps_ = retval[ 0 ].Stop( );
736 setup_next_step( *parent );
737 }
738 output.swap( retval );
739 }
740 };
741
754 class iterate_full_handler : public detail::iterate_handler
755 {
756 buffer::gps_second_type gps_start_;
757 buffer::gps_second_type gps_stop_;
758 buffer::gps_second_type gps_stride_;
759 buffer::gps_second_type cur_gps_;
760 channel::channel_names_type names_;
761 channels_type channels_;
762 epoch prev_epoch_;
763
764 public:
765 iterate_full_handler(
766 buffer::gps_second_type gps_start,
767 buffer::gps_second_type gps_stop,
768 buffer::gps_second_type stride,
769 const channel::channel_names_type& channel_names,
770 epoch prev_epoch,
771 const channels_type& channel_list,
772 std::shared_ptr< NDS::detail::conn_p_type >&& parent )
773 : iterate_handler( std::move( parent ) ),
774 gps_start_( gps_start ), gps_stop_( gps_stop ),
775 gps_stride_( stride ), cur_gps_( gps_start ),
776 names_( channel_names ), prev_epoch_( prev_epoch ),
777 channels_( channel_list ){
778
779 };
780
781 ~iterate_full_handler( ) override = default;
782
783 void
784 advance( )
785 {
786 cur_gps_ += gps_stride_;
787 };
788 bool
789 done( ) override
790 {
791 return ( gps_stride_ == 0 || cur_gps_ >= gps_stop_ );
792 };
793
794 bool
795 has_next( ) override
796 {
797 if ( !conn( ) )
798 {
799 return false;
800 }
801 if ( gps_start_ == 0 && gps_stop_ == 0 )
802 {
803 return true;
804 }
805 return !done( );
806 }
807
808 void
809 next( buffers_type& output ) override
810 {
811 buffers_type retval;
812
813 NDS::detail::dout( ) << "next_full" << std::endl;
814
815 auto parent = conn( );
816 if ( !parent )
817 {
818 throw std::out_of_range( "No next buffer" );
819 }
820 // fast path (online data)
821 if ( gps_start_ == 0 )
822 {
823 NDS::detail::dout( ) << "fast path" << std::endl;
824 parent->next_raw_buffer( retval );
825 }
826 else
827 {
828 NDS::detail::dout( )
829 << "Disabling request in progress in order to fetch"
830 << std::endl;
831 parent->request_in_progress( false );
832
833 buffer::gps_second_type start = cur_gps_;
834 buffer::gps_second_type end = gps_stop_;
835 buffer::gps_second_type stride = gps_stride_;
836 buffer::gps_second_type stop = start + stride;
837
838 if ( stop > end )
839 {
840 stop = end;
841 stride = stop - start;
842 }
843
844 NDS::detail::dout( ) << "Issuing fetch( " << start << ", "
845 << stop << " ... ) " << std::endl;
846 retval =
847 parent->fetch( start, stop, names_, &( channels_ ) );
848
849 advance( );
850 if ( done( ) )
851 {
852 NDS::detail::dout( ) << "iterate_full is complete"
853 << std::endl;
854 parent->set_epoch( prev_epoch_.gps_start,
855 prev_epoch_.gps_stop );
856 }
857 else
858 {
859 NDS::detail::dout( ) << "There is more to iterate, "
860 "request_in_progres enabled"
861 << std::endl;
862 parent->request_in_progress( true );
863 }
864 }
865 output.swap( retval );
866 }
867 };
868 }
869}
870
871#endif // NDS2_CLIENT_NDS_ITERATE_HANDLERS_HH
A buffer holds the data contents of a channel.
Definition nds_buffer.hh:28
DLL_EXPORT size_type seconds_to_samples(gps_second_type offset_seconds, gps_nanosecond_type offset_nano=0) const
Convert relative second offsets to relative sample offsets.
Definition nds_buffer.hh:404
const T * cbegin() const
Return a constant data iterator for this buffer, that references the beginning of the data.
Definition nds_buffer.hh:170
DLL_EXPORT size_type samples_to_bytes(size_type offset_samples) const
Convert relative sample offsets to relative byte offsets.
Definition nds_buffer.hh:418
DLL_EXPORT void reset_channel_info(const channel &ChannelInfo, gps_second_type Second, gps_nanosecond_type NanoSecond)
Reset the channel type. Set the sample count to 0.
Definition nds_buffer.cc:64
DLL_EXPORT void resize(size_type N)
Resize the container to hold N elements.
Definition nds_buffer.hh:477
Represents a LIGO data channel.
Definition nds_channel.hh:34
DLL_EXPORT channel_type Type() const
Return the type of the channel.
Definition nds_channel.hh:313
DLL_EXPORT sample_rate_type SampleRate() const
Return the sample rate of the channel.
Definition nds_channel.hh:348
size_t size_type
Defines sizes when dealing with channels.
Definition nds_channel.hh:99
@ PROTOCOL_ONE
Connect with NDS1 protocol.
Definition nds_connection.hh:607
The NDS client namespace.
Definition debug_stream.cc:18