Designing Of Stela System Enabling Stream Processing Systems To Scale-In And Scale-Out On-Demand
The volume of data is estimated to grow rapidly in the future due to the continuous penetration of new devices like smartphones, tablet, virtual reality sets, and wearable devices etc. The major demand now is processing huge volumes of data in an efficient manner. Several distributed processing systems like distributed batch systems such as Hadoop and distributed stream processing systems such as Storm, System S, Stream Storming have been developed with the advent of big data. This stream processing systems process large dynamic streams of data on the fly and provide high throughput but they do not scale the servers in an on-demand manner which means scaling resources based on user requests. This should be done in a seamless and efficient manner. On-demand needs to satisfy two goals such as optimizing the post-scale throughput (tuples per sec) and minimizing the interruption to the ongoing computation. So, a new system Stela (STream processing ELasticity) is proposed which satisfies the above two goals by selecting the operators that are given more resources for scale-out with less intrusion and by selecting a machine whose removal affects the overall performance in a minimal way for scale-in operations. Stela uses a new metric ETP (Effective Throughput Percentage) to select the best operator to assign resources while performing scale-out. Stela is integrated into Apache Storm.
Stream processing engines such as Borealis and Aurora enables queries to be modified on the fly. Elasticity is not supported in Borealis. Stormy does not take congestion into account as Stela does for scale-out operation. StreamCloud builds elasticity into Borealis but by changing the running topologies which Stela does not and considers intrusive to the applications. So, Stela is designed to intelligently select and release resources on-demand to maximize the post-scale throughput and minimize the effect of removing the machines in performance for scale-in operations.
Stela uses Distributed Stream processing in which each application is represented as Directed Acyclic Graph(DAG) of operators. The operators contain the user logic for performing various functions and has a number of instances of it which is closely related to its parallelism. An operator that has no parent is called source which reads data from web crawler and the operator that has no children is called sinks which provide the output to GUI application and the intermediate operators are used for processing tuples. Stela decides which operator to be assigned new resources by identifying their ETP metric when a user requests scale-out operation. This ETP approach is a greedy approach because resources to the congested operator with high ETP metric is always selected which optimizes the post-scale throughput.
A procedure CongestionDetection is called to determine the congested operators with high throughput. If the combined speed of all inputs is less than the combined input processing speed, then the operator is congested. Stela uses a CongestionRate parameter which is set to 1. 2. ETP which is the sum of throughputs of all sinks reachable from a source by at least one uncongested operator divided by the sum of throughput of all sinks in the system is calculated by doing a depth-first search and then post-order traversal through the DAG to find the congested operator to which resources need to be added. Stela increases the parallelism of the congested operator with highest ETP by assigning a new instance of the operator to the newly added machine. If multiple new machines are added, then it assigns the new operator instance in a round-robin manner. Finally, stela calculates the ETP of all operators again called as projected ETPs to make the decision for the next iteration which is done until all instance slots in the new machine are filled. Then, the schedule is committed by starting the appropriate executors on the new instances.
In case of scale-in, the number of operators to be removed is decided by the users, and the stela removes the machine with minimum ETPsum which is defined as the sum of ETP of all operator instance per machine. So, the machine with the least ETPsum is found using the scale-in procedure, so that removing this machine will have less impact on the overall performance and also reduce the downtime the application faced during rescheduling. Finally, Stela migrates the operators from the selected machines and releases these machines.
Stela is implemented as a custom scheduler in apache storm that processes incoming live stream data. Stela has an ElasticSchedular which runs a predefined ISchedular Interface in Storm which is run as a part of Storm Nimbus Daemon which contains three modules. First, the StaticServer collects statistics in storm cluster. Second, GlobalState which stores important state information such as where each task is stored in the cluster. Third, Strategy this module uses the information provided by the StaticServer and GlobalState to create new schedule based on the scale-in or scale-out strategy used. In case of failure occurs, Stela needs to be aborted and restarted as it uses storms fault-tolerance.
Stela is tested and compared with storm default scheduler and the Link load strategy in different topologies like a star, linear, diamond and some of the realistic topologies from Yahoo Inc. such as Page Load Topology and Processing Topology. Stela’s post-scale throughput is 65%, 45%, and 125% better than the storm’s default scheduler in a star, linear and diamond topology. For the Yahoo topologies, Stela improves the throughput by 80% after scale-out but the least link barely increased the throughput and the storm default scheduler decreases the throughput after scale-out. Stela is less intrusive than Storm because it schedules operators to new machines instead of changing the current scheduling at existing machines. However, it experiences long convergence time (interruption to the ongoing communication) than default storm scheduler and Link-load strategy due to the re-parallelization during scale-out operation. For scale-in, Stela preserves throughput but default Storm has decreased in throughput. So, Stela’s scale-in operation achieves post-scale throughput 45-120% higher than storm and scale-in operation performs 2X-5X better than Storm’s default strategy.