Bobox
Example of two-way merge box

This is sample application (in the bobox/bofe/merge directory) which implements three boxes - a source of integers, the two-way merge box and box which prints its input to the console. These boxes are connected into this model:

merge.png
#include <iostream>

#include "bobox_manager.hpp"
#include "bobox_runtime.hpp"
#include "bobox_request.hpp"
#include "bobox_basic_object_factory.hpp"
#include "bobox_basic_box.hpp"
#include "bobox_basic_box_utils.hpp"

// generates sequence of odd/even integers according to the parameters
class source_box : public bobox::basic_box {
public:
        typedef generic_model<source_box, bobox::BST_STATEFUL> model;

        BOBOX_BOX_INPUTS_LIST(main,0);
        BOBOX_BOX_OUTPUTS_LIST(main,0);

        source_box(const box_parameters &box_params, const bobox::parameters_ptr_type &params)
                : bobox::basic_box(box_params)
        {
                params->get_parameter("odd", odd_);
        }

        virtual void init_impl() BOBOX_OVERRIDE
        {
                prefetch_envelope(inputs::main());
        }

        virtual void sync_mach_etwas() BOBOX_OVERRIDE
        {
                BOBOX_ASSERT(pop_envelope(inputs::main())->is_poisoned());

                bobox::output_stream stream(this, output_to_outarc(outputs::main()));

                for (int i=0; i<100; i++) {
                        stream_line line = stream.next();
                        line.get_cell<int>(0) = i*2 + odd_;
                }
        }

private:
        bool odd_;
};

// merges two sorted streams into one
class merge_box : public bobox::basic_box {
public:

        typedef generic_model<merge_box, bobox::BST_STATEFUL> model;

        BOBOX_BOX_INPUTS_LIST(left,0, right,1);
        BOBOX_BOX_OUTPUTS_LIST(main,0);

        merge_box(const box_parameters &box_params)
                : bobox::basic_box(box_params)
        {
        }

        virtual void init_impl() BOBOX_OVERRIDE
        {
                prefetch_envelope(inputs::left());
                prefetch_envelope(inputs::right());
        }

        virtual void sync_mach_etwas() BOBOX_OVERRIDE
        {
                bobox::input_stream left(this, input_to_inarc(inputs::left()));
                bobox::input_stream right(this, input_to_inarc(inputs::right()));
                bobox::output_stream output(this, output_to_outarc(outputs::main()));
                
                while (!left.eof() && !right.eof()) {
                        int l = left.current().get_cell<int>(0);
                        int r = right.current().get_cell<int>(0);

                        if (l <= r) {
                                output.next().get_cell<int>(0) = l;
                                left.move_next();
                        } else { 
                                output.next().get_cell<int>(0) = r;
                                right.move_next();
                        }
                }

                while (!left.eof()) {
                        output.next().get_cell<int>(0) = left.current().get_cell<int>(0);
                        left.move_next();
                }

                while (!right.eof()) {
                        output.next().get_cell<int>(0) = right.current().get_cell<int>(0);
                        right.move_next();
                }
        }
};

// prints the input stream
class sink_box : public bobox::basic_box {
public:

        typedef generic_model<sink_box, bobox::BST_STATEFUL> model;

        sink_box(const box_parameters &box_params)
                : bobox::basic_box(box_params)
        {
        }

        BOBOX_BOX_INPUTS_LIST(main,0);
        BOBOX_BOX_OUTPUTS_LIST(main,0);

        virtual void init_impl() BOBOX_OVERRIDE
        {
                prefetch_envelope(inputs::main());
        }

        virtual void sync_mach_etwas() BOBOX_OVERRIDE
        {
                for (envelope_ptr_type env = pop_envelope(inputs::main()); env->is_data(); env = pop_envelope(inputs::main())) {
                        for (size_t i=0; i<env->get_size(); i++) {
                                std::cout << env->get_column(column_index_type(0)).get_cell<int>(i) << std::endl;
                        }
                }
                send_poisoned(outputs::main());
        }

};

class runtime : public bobox::runtime, public bobox::basic_object_factory {
private:
        virtual void init_impl() BOBOX_OVERRIDE
        {
                register_box<source_box::model>(bobox::box_model_tid_type("Source"));
                register_box<sink_box::model>(bobox::box_model_tid_type("Sink"));
                register_box<merge_box::model>(bobox::box_model_tid_type("Merge"));

                register_type<int>(bobox::type_tid_type("int"));
        }

        virtual bobox::runtime *get_runtime() BOBOX_OVERRIDE { return this; }
};

int main(int argc, char *argv[])
{
        bobox::manager mng;

        runtime rt;
        rt.init();

        std::string str(
                "model main<()><()> { "
                "       bobox::broadcast<()><(),()> broadcast; "
                "       Source<()><(int)> source1(odd=true), source2(odd=false); "
                "       Merge <(int),(int)><(int)> merge; "
                "       Sink <(int)><()> sink; "
                "       "
                "       input -> broadcast; "
                "       broadcast[0] -> source1; "
                "       broadcast[1] -> source2; "
                "       source1 -> [left]merge; "
                "       source2 -> [right]merge; "
                "       merge -> sink -> output; "
                "}");
        std::istringstream in(str);

        bobox::request_id_type rqid = mng.create_request(bobox::bobolang::compile(in, &rt));

#ifdef BOBOX_DEBUG
        mng.capture_events(rqid, bobox::box_id_type(8), bobox::manager::ET_ALL);
        std::ofstream events("dump.ev");
        mng.set_event_stream(rqid, &events);
#endif

        mng.run_request(rqid);
        mng.wait_on_request(rqid);

        std::ofstream dot("dump.dot");
        mng.dump_request(rqid, dot, false, false);

        switch (mng.get_result(rqid)) {
                case bobox::RRT_ERROR: std::cout << "Error" << std::endl; break;
                case bobox::RRT_CANCELED: std::cout << "Canceled" << std::endl; break;
                case bobox::RRT_DEADLOCK: std::cout << "Deadlock" << std::endl; break;
                case bobox::RRT_MEMORY: std::cout << "Memory" << std::endl; break;
                case bobox::RRT_OK: std::cout << "OK" << std::endl; break;
                case bobox::RRT_TIMEOUT: std::cout << "Timeout" << std::endl; break;
                default: BOBOX_ASSERT(false); break;
        }

        mng.destroy_request(rqid);

        return 0;
}

And here is the example of the communication of the merge box:

Event #1 (8): Incoming envelope (0): #1 (0, 6)
        type: data
         size: 6
        scalars:<NULL>

        1 
        3 
        5 
        7 
        9 
        11 

Event #2 (8): Incoming envelope (1): #1 (0, 6)
        type: data
         size: 6
        scalars:<NULL>

        0 
        2 
        4 
        6 
        8 
        10 

Event #3 (8): Outgoing envelope (0): #1 (0, 6)
        type: data
         size: 6
        scalars:<NULL>

        0 
        1 
        2 
        3 
        4 
        5 

Event #4 (8): Incoming envelope (1): #2 (6, 18)
        type: data
         size: 12
        scalars:<NULL>

        12 
        14 
        16 
        18 
        20 
        22 
        24 
        26 
        28 
        30 
        32 
        34 

Event #5 (8): Incoming envelope (0): #2 (6, 18)
        type: data
         size: 12
        scalars:<NULL>

        13 
        15 
        17 
        19 
        21 
        23 
        25 
        27 
        29 
        31 
        33 
        35 

Event #6 (8): Outgoing envelope (0): #2 (6, 18)
        type: data
         size: 12
        scalars:<NULL>

        6 
        7 
        8 
        9 
        10 
        11 
        12 
        13 
        14 
        15 
        16 
        17 

Event #7 (8): Outgoing envelope (0): #3 (18, 27)
        type: data
         size: 9
        scalars:<NULL>

        18 
        19 
        20 
        21 
        22 
        23 
        24 
        25 
        26 

Event #8 (8): Outgoing envelope (0): #4 (27, 32)
        type: data
         size: 5
        scalars:<NULL>

        27 
        28 
        29 
        30 
        31 

Event #9 (8): Incoming envelope (1): #3 (18, 27)
        type: data
         size: 9
        scalars:<NULL>

        36 
        38 
        40 
        42 
        44 
        46 
        48 
        50 
        52 

...

Event #42 (8): Outgoing envelope (0): #21 (187, 193)
        type: data
         size: 6
        scalars:<NULL>

        187 
        188 
        189 
        190 
        191 
        192 

Event #43 (8): Incoming envelope (0): #11 (97, 100)
        type: data
         size: 3
        scalars:<NULL>

        195 
        197 
        199 

Event #44 (8): Incoming envelope (1): #12(100)
        type: poisoned

Event #45 (8): Incoming envelope (0): #12(100)
        type: poisoned

Event #46 (8): Outgoing envelope (0): #22 (193, 200)
        type: data
         size: 7
        scalars:<NULL>

        193 
        194 
        195 
        196 
        197 
        198 
        199 

Event #47 (8): Outgoing envelope (0): #23(200)
        type: poisoned