nds2-client - Developer 0.16.7
Loading...
Searching...
No Matches
buffered_reader.hh
Go to the documentation of this file.
1//
2// Created by Jonathan Hanks on 4/15/17.
3//
4
5#ifndef NDS_PROXY_BUFFERED_READER_HH
6#define NDS_PROXY_BUFFERED_READER_HH
7
8#include <algorithm>
9#include <stdexcept>
10#include <vector>
11
12namespace nds_impl
13{
14 namespace Socket
15 {
16 template < typename Reader >
18 {
19 public:
20 explicit BufferedReader( Reader& r ) : r_( r ), buf_{}
21 {
22 }
23 // BufferedReader(Reader&& r): r_{std::move(r)}, buf_{} {}
24 // BufferedReader(Reader&& r): r_{std::move(r)} {}
25
26 // BufferedReader(BufferedReader<Reader, ReaderCaptureType>&&
27 // other): r_{std::move(other.r_)}, buf_{std::move(other.buf)} {}
28
34 void
35 write_all( const char* start, const char* end )
36 {
37 r_.write_all( start, end );
38 }
39
48 char*
49 read_available( char* start, char* end )
50 {
51 size_t len_requested = end - start;
52
53 if ( buf_.empty( ) )
54 fill_buffer( );
55 if ( buf_.size( ) < len_requested )
56 len_requested = buf_.size( );
57 std::copy( buf_.data( ), buf_.data( ) + len_requested, start );
58 consume( len_requested );
59 return start + len_requested;
60 }
61
68 char*
69 read_exactly( char* start, const char* end )
70 {
71 size_t len_requested = end - start;
72
73 auto result = peek( start, end );
74 consume( len_requested );
75 return result;
76 }
77
88 char*
89 peek( char* start, const char* end )
90 {
91 size_t len_requested = end - start;
92
93 while ( buf_.size( ) < len_requested )
94 fill_buffer( );
95 std::copy( buf_.data( ), buf_.data( ) + len_requested, start );
96 return start + len_requested;
97 }
98
110 template < typename It >
111 std::vector< char >
112 read_until( It begin_set, It end_set )
113 {
114 std::vector< char > result;
115
116 decltype( buf_.size( ) ) cur = 0;
117 auto remaining = buf_.size( );
118 for ( bool match = false; !match; ++cur, --remaining )
119 {
120 if ( remaining == 0 )
121 {
122 fill_buffer( );
123 remaining = buf_.size( ) - cur;
124 }
125
126 if ( std::find( begin_set, end_set, buf_.at( cur ) ) !=
127 end_set )
128 {
129 match = true;
130 }
131 }
132 result.resize( cur );
133 std::copy(
134 buf_.begin( ), buf_.begin( ) + cur, result.begin( ) );
135 consume( cur );
136 return result;
137 }
138
151 template < typename It, typename OutIt >
152 OutIt
153 read_until( It begin_set,
154 It end_set,
155 OutIt dest_start,
156 OutIt dest_end )
157 {
158 decltype( buf_.size( ) ) cur = 0;
159 auto remaining = buf_.size( );
160 OutIt dest_cur = dest_start;
161 bool match = false;
162 for ( ; !match && dest_cur != dest_end;
163 ++cur, --remaining, ++dest_cur )
164 {
165 if ( remaining == 0 )
166 {
167 fill_buffer( );
168 remaining = buf_.size( ) - cur;
169 }
170
171 if ( std::find( begin_set, end_set, buf_.at( cur ) ) !=
172 end_set )
173 {
174 match = true;
175 }
176 }
177 if ( match )
178 {
179 std::copy( buf_.begin( ), buf_.begin( ) + cur, dest_start );
180 }
181 else
182 {
183 throw std::range_error(
184 "Unable to find matching sequence" );
185 }
186 consume( cur );
187 return dest_cur;
188 }
189
194 std::vector< char >::size_type
196 {
197 return buf_.size( );
198 }
199
200 private:
201 Reader& r_;
202 std::vector< char > buf_;
203
204 void
206 {
207 const decltype( buf_.size( ) ) chunk_size = 1024;
208 auto data_size = buf_.size( );
209
210 buf_.resize( data_size + chunk_size );
211 auto data_start = buf_.data( ) + data_size;
212
213 auto result =
214 r_.read_available( data_start, data_start + chunk_size );
215
216 auto data_read_in =
217 static_cast< size_t >( result - data_start );
218 if ( data_read_in > 0 )
219 {
220 buf_.resize( data_size + data_read_in );
221 }
222 else
223 {
224 buf_.resize( data_size );
225 throw std::runtime_error( "io operation failed" );
226 }
227 }
228
229 void
230 consume( std::vector< char >::size_type count )
231 {
232 if ( count <= 0 )
233 return;
234 if ( count >= buf_.size( ) )
235 {
236 buf_.clear( );
237 return;
238 }
239 auto current_size = buf_.size( );
240 auto new_head = buf_.data( ) + count;
241
242 std::move(
243 new_head, buf_.data( ) + current_size, buf_.data( ) );
244 buf_.resize( current_size - count );
245 }
246 };
247
248 template < typename Reader >
250 {
251 Reader r_;
252 std::vector< char > buf_;
253
254 void
256 {
257 const decltype( buf_.size( ) ) chunk_size = 1024;
258 auto data_size = buf_.size( );
259
260 buf_.resize( data_size + chunk_size );
261 auto data_start = buf_.data( ) + data_size;
262
263 auto result =
264 r_.read_available( data_start, data_start + chunk_size );
265
266 auto data_read_in =
267 static_cast< size_t >( result - data_start );
268 if ( data_read_in > 0 )
269 {
270 buf_.resize( data_size + data_read_in );
271 }
272 else
273 {
274 buf_.resize( data_size );
275 throw std::runtime_error( "io operation failed" );
276 }
277 }
278
279 void
280 consume( std::vector< char >::size_type count )
281 {
282 if ( count <= 0 )
283 return;
284 if ( count >= buf_.size( ) )
285 {
286 buf_.clear( );
287 return;
288 }
289 auto current_size = buf_.size( );
290 auto new_head = buf_.data( ) + count;
291
292 std::move(
293 new_head, buf_.data( ) + current_size, buf_.data( ) );
294 buf_.resize( current_size - count );
295 }
296
297 public:
298 explicit OwningBufferedReader( Reader&& r )
299 : r_{ std::move( r ) }, buf_{}
300 {
301 }
302 // BufferedReader(Reader&& r): r_{std::move(r)}, buf_{} {}
303 // BufferedReader(Reader&& r): r_{std::move(r)} {}
304
305 // BufferedReader(BufferedReader<Reader, ReaderCaptureType>&&
306 // other): r_{std::move(other.r_)}, buf_{std::move(other.buf)} {}
307
313 void
314 write_all( const char* start, const char* end )
315 {
316 r_.write_all( start, end );
317 }
318
327 char*
328 read_available( char* start, char* end )
329 {
330 size_t len_requested = end - start;
331
332 if ( buf_.empty( ) )
333 fill_buffer( );
334 if ( buf_.size( ) < len_requested )
335 len_requested = buf_.size( );
336 std::copy( buf_.data( ), buf_.data( ) + len_requested, start );
337 consume( len_requested );
338 return start + len_requested;
339 }
340
347 char*
348 read_exactly( char* start, const char* end )
349 {
350 size_t len_requested = end - start;
351
352 while ( buf_.size( ) < len_requested )
353 fill_buffer( );
354 std::copy( buf_.data( ), buf_.data( ) + len_requested, start );
355 consume( len_requested );
356 return start + len_requested;
357 }
358
370 template < typename It >
371 std::vector< char >
372 read_until( It begin_set, It end_set )
373 {
374 std::vector< char > result;
375
376 decltype( buf_.size( ) ) cur = 0;
377 auto remaining = buf_.size( );
378 for ( bool match = false; !match; ++cur, --remaining )
379 {
380 if ( remaining == 0 )
381 {
382 fill_buffer( );
383 remaining = buf_.size( ) - cur;
384 }
385
386 if ( std::find( begin_set, end_set, buf_.at( cur ) ) !=
387 end_set )
388 {
389 match = true;
390 }
391 }
392 result.resize( cur );
393 std::copy(
394 buf_.begin( ), buf_.begin( ) + cur, result.begin( ) );
395 consume( cur );
396 return result;
397 }
398
403 std::vector< char >::size_type
405 {
406 return buf_.size( );
407 }
408 };
409 }
410}
411
413// Tests only after this point.
415
416#ifdef _NDS_IMPL_ENABLE_CATCH_TESTS_
417
418#include <algorithm>
419#include <iterator>
420#include <sstream>
421
422#include "socket/socket.hh"
423#include "tests/dummy_socket.hh"
424#include "catch.hpp"
425
426TEST_CASE( "Create a buffered socket", "[buffered,create]" )
427{
430}
431
432TEST_CASE( "You can write through a buffered reader", "[buffered,write]" )
433{
434 using namespace nds_testing;
435
438
439 auto hello = std::string( "hello" );
440 b.write_all( hello.data( ), hello.data( ) + hello.length( ) );
441 REQUIRE( s.str( ) == hello );
442
443 auto world = std::string( " world!" );
444 b.write_all( world.data( ), world.data( ) + world.length( ) );
445 REQUIRE( s.str( ) == hello + world );
446}
447
448TEST_CASE( "You can read from a buffered writter", "[buffered,read]" )
449{
450 using namespace nds_testing;
451
452 std::vector< char > dest( 20 );
453
454 auto hello = std::string{ "hello world!" };
455 auto s = DummySocket( hello );
456
458
459 REQUIRE( b.__buffered_bytes( ) == 0 );
460
461 auto end = b.read_available( dest.data( ), dest.data( ) + dest.size( ) );
462 REQUIRE( b.__buffered_bytes( ) == 0 );
463 REQUIRE( end != dest.data( ) );
464 REQUIRE( end == dest.data( ) + hello.size( ) );
465 auto output = std::string( dest.data( ), end );
466 REQUIRE( output == hello );
467}
468
469TEST_CASE( "You can read the data in small segments from a buffered reader",
470 "[buffered,read]" )
471{
472 using namespace nds_testing;
473
474 std::vector< char > dest( 5 );
475
476 auto hello = std::string{ "hello world!" };
477 auto s = DummySocket( hello );
478
480
481 REQUIRE( b.__buffered_bytes( ) == 0 );
482
483 auto end = b.read_available( dest.data( ), dest.data( ) + dest.size( ) );
484 REQUIRE( b.__buffered_bytes( ) == ( hello.size( ) - dest.size( ) ) );
485 REQUIRE( end != dest.data( ) );
486 REQUIRE( end == dest.data( ) + dest.size( ) );
487 auto output = std::string( dest.data( ), end );
488 REQUIRE( output == hello.substr( 0, dest.size( ) ) );
489
490 end = b.read_available( dest.data( ), dest.data( ) + dest.size( ) );
491 REQUIRE( b.__buffered_bytes( ) == ( hello.size( ) - 2 * dest.size( ) ) );
492 REQUIRE( end != dest.data( ) );
493 REQUIRE( end == dest.data( ) + dest.size( ) );
494 output = std::string( dest.data( ), end );
495 REQUIRE( output == hello.substr( dest.size( ), dest.size( ) ) );
496}
497
498TEST_CASE( "You can ask for an exact amount of bytes to be returned",
499 "[buffered,read]" )
500{
501 using namespace nds_testing;
502
503 std::vector< char > dest( 10 );
504
505 auto input = std::string{ "0123456789" };
506 auto s = DummySocket( input );
508
509 auto stride = 4;
510 auto end = b.read_exactly( dest.data( ), dest.data( ) + stride );
511 REQUIRE( b.__buffered_bytes( ) == ( input.size( ) - stride ) );
512 REQUIRE( end == dest.data( ) + stride );
513
514 REQUIRE_THROWS(
515 b.read_exactly( dest.data( ), dest.data( ) + dest.size( ) ) );
516}
517
518TEST_CASE( "You can peek at an exact amount of data without consuming it",
519 "[buffered,read]" )
520{
521 using namespace nds_testing;
522
523 std::vector< char > dest( 10 );
524 auto input = std::string{ "0123456789" };
525
526 auto s = DummySocket( input );
528
529 auto stride = 4;
530 auto end = b.peek( dest.data( ), dest.data( ) + stride );
531 REQUIRE( b.__buffered_bytes( ) == input.size( ) );
532 REQUIRE( end == dest.data( ) + stride );
533 REQUIRE( std::equal( dest.data( ), dest.data( ) + stride, input.data( ) ) );
534
535 // clear the buffer
536 std::fill( dest.begin( ), dest.end( ), 0 );
537 REQUIRE(
538 !std::equal( dest.data( ), dest.data( ) + stride, input.data( ) ) );
539
540 end = b.read_exactly( dest.data( ), dest.data( ) + stride );
541 REQUIRE( b.__buffered_bytes( ) == input.size( ) - stride );
542 REQUIRE( end == dest.data( ) + stride );
543 REQUIRE( std::equal( dest.data( ), dest.data( ) + stride, input.data( ) ) );
544}
545
546TEST_CASE( "You can ask a buffered read to read until a sequence is found",
547 "[buffered,read_until]" )
548{
549 using namespace nds_testing;
550
551 auto input = std::string{ "0123456789" };
552 auto s = DummySocket( input );
554
555 auto terminators = std::string( "59" );
556 auto result = b.read_until( terminators.begin( ), terminators.end( ) );
557 REQUIRE( std::string( result.begin( ), result.end( ) ) ==
558 std::string( "012345" ) );
559}
560
561TEST_CASE( "You can ask a buffered read to read until a sequence is found, "
562 "with a bound",
563 "[buffered,read_until]" )
564{
565 using namespace nds_testing;
566
567 std::vector< char > dest( 5 );
568 auto input = std::string{ "0123456789abcdef" };
569 auto s = DummySocket( input );
571
572 std::fill( dest.begin( ), dest.end( ), 0 );
573 auto terminators = std::string( "29" );
574 auto end = b.read_until(
575 terminators.begin( ), terminators.end( ), dest.begin( ), dest.end( ) );
576 auto length = std::distance( dest.begin( ), end );
577 REQUIRE( length == 3 );
578 REQUIRE( std::string( dest.data( ), length ) == std::string( "012" ) );
579}
580
581TEST_CASE( "A bounded buffered read_until will throw an exception if it cannot "
582 "find a sequence",
583 "[buffered,read_until]" )
584{
585 using namespace nds_testing;
586
587 std::vector< char > dest( 5 );
588 auto input = std::string{ "0123456789abcdef" };
589 auto s = DummySocket( input );
591
592 auto terminators = std::string( "Z" );
593 REQUIRE_THROWS_AS( b.read_until( terminators.begin( ),
594 terminators.end( ),
595 dest.begin( ),
596 dest.end( ) ),
597 std::range_error );
598}
599
600TEST_CASE( "A bounded buffered read_until will throw an exception if it cannot "
601 "find a sequence when input is empty",
602 "[buffered,read_until]" )
603{
604 using namespace nds_testing;
605
606 std::vector< char > dest( 5 );
607 auto input = std::string{ "" };
608 auto s = DummySocket( input );
610
611 auto terminators = std::string( "Z" );
612 REQUIRE_THROWS_AS( b.read_until( terminators.begin( ),
613 terminators.end( ),
614 dest.begin( ),
615 dest.end( ) ),
616 std::runtime_error );
617}
618
619TEST_CASE( "Create an owning buffered socket", "[buffered,create]" )
620{
623 std::move( s1 )
624 };
625}
626
627TEST_CASE( "You can write through a owning buffered reader",
628 "[buffered,write]" )
629{
630 using namespace nds_testing;
631
632 std::vector< char > buf;
633 RecordingDummySocket s( buf );
635 s ) };
636
637 auto hello = std::string( "hello" );
638 b.write_all( hello.data( ), hello.data( ) + hello.length( ) );
639 REQUIRE( std::string( buf.data( ), buf.size( ) ) == hello );
640
641 auto world = std::string( " world!" );
642 b.write_all( world.data( ), world.data( ) + world.length( ) );
643 REQUIRE( std::string( buf.data( ), buf.size( ) ) == hello + world );
644}
645
646TEST_CASE( "You can read from a owning buffered writter", "[buffered,read]" )
647{
648 using namespace nds_testing;
649
650 std::vector< char > dest( 20 );
651
652 auto hello = std::string{ "hello world!" };
653 auto s = DummySocket( hello );
654
656
657 REQUIRE( b.__buffered_bytes( ) == 0 );
658
659 auto end = b.read_available( dest.data( ), dest.data( ) + dest.size( ) );
660 REQUIRE( b.__buffered_bytes( ) == 0 );
661 REQUIRE( end != dest.data( ) );
662 REQUIRE( end == dest.data( ) + hello.size( ) );
663 auto output = std::string( dest.data( ), end );
664 REQUIRE( output == hello );
665}
666
668 "You can read the data in small segments from an owning buffered reader",
669 "[buffered,read]" )
670{
671 using namespace nds_testing;
672
673 std::vector< char > dest( 5 );
674
675 auto hello = std::string{ "hello world!" };
676 auto s = DummySocket( hello );
677
679
680 REQUIRE( b.__buffered_bytes( ) == 0 );
681
682 auto end = b.read_available( dest.data( ), dest.data( ) + dest.size( ) );
683 REQUIRE( b.__buffered_bytes( ) == ( hello.size( ) - dest.size( ) ) );
684 REQUIRE( end != dest.data( ) );
685 REQUIRE( end == dest.data( ) + dest.size( ) );
686 auto output = std::string( dest.data( ), end );
687 REQUIRE( output == hello.substr( 0, dest.size( ) ) );
688
689 end = b.read_available( dest.data( ), dest.data( ) + dest.size( ) );
690 REQUIRE( b.__buffered_bytes( ) == ( hello.size( ) - 2 * dest.size( ) ) );
691 REQUIRE( end != dest.data( ) );
692 REQUIRE( end == dest.data( ) + dest.size( ) );
693 output = std::string( dest.data( ), end );
694 REQUIRE( output == hello.substr( dest.size( ), dest.size( ) ) );
695}
696
697TEST_CASE( "You can ask for an exact amount of bytes to be returned from an "
698 "owning buffered reader",
699 "[buffered,read]" )
700{
701 using namespace nds_testing;
702
703 std::vector< char > dest( 10 );
704
705 auto input = std::string{ "0123456789" };
706 auto s = DummySocket( input );
708
709 auto stride = 4;
710 auto end = b.read_exactly( dest.data( ), dest.data( ) + stride );
711 REQUIRE( b.__buffered_bytes( ) == ( input.size( ) - stride ) );
712 REQUIRE( end == dest.data( ) + stride );
713
714 REQUIRE_THROWS(
715 b.read_exactly( dest.data( ), dest.data( ) + dest.size( ) ) );
716}
717
719 "You can ask an owning buffered read to read until a sequence is found",
720 "[buffered,read_until]" )
721{
722 using namespace nds_testing;
723
724 std::vector< char > dest( 10 );
725
726 auto input = std::string{ "0123456789" };
727 auto s = DummySocket( input );
729
730 auto terminators = std::string( "59" );
731 auto result = b.read_until( terminators.begin( ), terminators.end( ) );
732 REQUIRE( std::string( result.begin( ), result.end( ) ) ==
733 std::string( "012345" ) );
734}
735
736#endif // _NDS_IMPL_ENABLE_CATCH_TESTS_
737
738#endif // NDS_PROXY_BUFFERED_READER_HH
Definition buffered_reader.hh:18
std::vector< char >::size_type __buffered_bytes() const
Definition buffered_reader.hh:195
char * read_exactly(char *start, const char *end)
Definition buffered_reader.hh:69
char * peek(char *start, const char *end)
Definition buffered_reader.hh:89
std::vector< char > buf_
Definition buffered_reader.hh:202
void fill_buffer()
Definition buffered_reader.hh:205
void write_all(const char *start, const char *end)
Definition buffered_reader.hh:35
std::vector< char > read_until(It begin_set, It end_set)
Definition buffered_reader.hh:112
Reader & r_
Definition buffered_reader.hh:201
char * read_available(char *start, char *end)
Definition buffered_reader.hh:49
void consume(std::vector< char >::size_type count)
Definition buffered_reader.hh:230
OutIt read_until(It begin_set, It end_set, OutIt dest_start, OutIt dest_end)
Definition buffered_reader.hh:153
BufferedReader(Reader &r)
Definition buffered_reader.hh:20
Definition socket.hh:109
Definition buffered_reader.hh:250
char * read_exactly(char *start, const char *end)
Definition buffered_reader.hh:348
void consume(std::vector< char >::size_type count)
Definition buffered_reader.hh:280
std::vector< char >::size_type __buffered_bytes() const
Definition buffered_reader.hh:404
void write_all(const char *start, const char *end)
Definition buffered_reader.hh:314
std::vector< char > read_until(It begin_set, It end_set)
Definition buffered_reader.hh:372
void fill_buffer()
Definition buffered_reader.hh:255
Reader r_
Definition buffered_reader.hh:251
OwningBufferedReader(Reader &&r)
Definition buffered_reader.hh:298
char * read_available(char *start, char *end)
Definition buffered_reader.hh:328
std::vector< char > buf_
Definition buffered_reader.hh:252
Definition span_reader.hh:14
Definition dummy_socket.hh:13
Definition dummy_socket.hh:46
Definition dummy_socket.hh:15
std::string str()
Definition dummy_socket.hh:39
TEST_CASE("daq_strlcpy copies strings safely when buffers are sufficiently large")
Definition test_bsd_string.cc:9