A story about reactive thinking

As an expert in reactive programming, Felix has put together an excellent introduction for your upcoming reactive projects.

In 2014, Max and Simon thought about starting a company to maximize user experience with modern web development. When clients came in with an idea, they did everything themselves (including research, estimates, design, development, marketing, and accounting). But they soon realized they needed to change their structure to accommodate the increasing workload. This story is about modeling their business in a responsive way.

Reactive terms

Before we start refactoring, we should clarify some terms in advance. In the reactive world, there are observables. These observables are a data stream. You can subscribe to this data stream and then get all the new elements that are put into the data stream. Imagine you are watching TV. Then you get the current picture and the current sound as a hot stream. Meaning that it is active even when you are not watching or listening to the stream. On the other hand, if you are watching Netflix, the stream is cold. It starts only when you click on a movie and broadcasts only that movie. Observables are cold by default. Application-wise, a hot observable can be a stream that listens to user input such as clicks on certain items. For example, a cold observable can be an http request to a web server. These observables are preceded by a $ character in the code by convention. In addition to this behavior, observables are chainable. Each observable has a pipe member function. It can treat so-called operators as its arguments. You can think of an operator as an observable modification, but very importantly, as an observable. The observable is a class that looks like this:

class Observable { producer; constructor(producer) { this.producer = producer; } subscribe(observer) { return this.producer(observer); } pipe(...operators) { return operators.reduce((source, next) => next(source), this); } }

In this class, you can see that the observable executes the producer function only when the subscription is executed. In addition, the pipe function receives a list of operators that are executed step by step, with each step serving as the source for the next step.

The customer, the company and the app

In terms of RxJS software, the customer is an observable that modifies the environment stream. When something in the environment changes, such as the market situation, the customer receives this information automatically. Based on this information, he builds a new application idea and determines its requirements and the available budget.

const client$ = environment$.pipe( map(environment => { // analyze the market // find a new idea based on the market knowledge // return {idea, requirements, budget} })

The map function performs a mapping function. This function maps the environment to other properties based on the developer's requirements. Max and Simon's company currently looks like this. They are only two people and have to handle each task alone. This leads to a huge list of tasks and this can get very chaotic.

const company$ = zip( clients$, environment$ ).pipe( map(([{ requirements, idea, budget }, environment]) => { // reject or accept the idea based on // environment, budget and requirements // transform the idea to a PoC // estimate risks and time schedule // destructure problem to small tickets // implement designs and software // return an application }) );

Once the application is completed, it is forwarded to the testers before being published to the Playstore. The testers receive the application and the customer's requirements and check if each part is demanding. If it is not, they reject the application and do not pass it on to further subscriptions.

const requirements$ = clients$.pipe(pluck('requirements')); const testers$ = zip( requirements$, company$ ).pipe( filter(([requirements, application]) => application.matches(requirements) ) );

The filter function takes a function that checks whether the emitted values (requests and application) should continue the stream based on correct or incorrect values. The Playstore should add (or overwrite) new applications (or versions) in the Playstore. In addition, all customers browsing the appstore (subscribing to the store) should receive the same value, regardless of how many people are watching the store.

const playstore$ = testers$.pipe( scan((allApps, app) => ({...allApps, [app.id]: app}), {}), shareReplay(1) );

This is the complete process of a reactive type of programming. When there is a change in the environment$, the data will reactively go through all these steps. To complete the idea, we need to know that each process step only makes sense if there is at least one subscriber to the transaction, i.e., there is at least one subscriber for each observable in the observable chain. Otherwise, not all parts of the process would be executed. This subscription process is a very important pattern in reactive programming and is discussed in detail in the following section. It should be kept in mind that this process may not reflect an actual real-world scenario.

The steps of the subscription

Before we expand our decoupled stream further, it's time for a little digression. Typically, this isn't mentioned very often. How subscriptions work: Think of environments$ as an http call to an API that can be requested to get details about companies, interests, and financial position, and then completed. A custom observable that makes an XHR query would look like this:

function get(url: string): Observable { const producer = (observer: Observer) => { const xhr = new XMLHttpRequest(); xhr.addEventListener('load', () => { if (xhr.status === 200 && xhr.readyState === 4) { observer.next(JSON.parse(xhr.responseText)); observer.complete(); } }); xhr.open('GET', url); xhr.send(); return () => xhr.abort(); } return new Observable(producer); }

We can use this observable in the RxJS method by defining an observer with the next, faulty and complete function:

environments$ = get(`https://publicapi.com`).subscribe({ next: (data) => console.log(data), error: (err) => console.log(err), complete: () => console.log('The stream has completed'); });

with an imaginary URL, of course. When we subscribe to this observable, it executes the named producer function, which in this case is the XMLHttpRequest and returns the data to the stream with the next observer function, and then completes. The subscribe method receives either one observer or three callbacks in exactly the same order as above. Let's take a look at the operators and create a custom operator.

function map(mapOperation) { return (source) => { const producer = (observer) => source.subscribe({ next: (value) => observer.next(mapOperation(value)) }); return new Observable(producer); } }

In this example, Map returns a function that has a source argument and returns a new observable. The Observables Producer function subscripts the observable source and returns each value of the observable source with the mapFn applied. You can take a look at the original RxJS map operator and you will see the similarities. Now we can use the pipe function to concatenate this observable with the http observable.

const currentTechnologies$ = get('https://publicapi.com').pipe( map((environment) => environment.currentTechnologies) );currentTechnologies$.subscribe({ next: (technologies) => console.log(technologies), complete: () => console.log('Stream has completed') })

What does the subscription do? You can think of each entry in a pipe as another observable. It first subscribes the observable returned by the map operator, and then that observable subscribes the http observable. Then the actual http process is executed (because it is a cold observable and is only executed on subscription). When the data arrives, it is processed inside the map operator and after that is accessible in Subscription. In addition, the http request observable completes and sends the complete state to the map, capping the subscription between the two. Then the Map outputs the complete state and the complete function within the Subscription fires. The subscription between the subscription and the map is now gone as well. Subscription is a key concept to keep in mind for the rest of this article and whenever we will be working with RxJS. Let's continue with the creation of the subscription. It's time to get more people involved.

Time to hire

Developer:inside

As time went by and more and more clients requested services, founders Max and Simon thought it would be nice to have a real expert in technology to choose from for most of their projects. They also thought it would be good to have a more structured way of doing things. The only requirement that needs to be met is that the company receives software ideas and payments from the client and delivers software in return. So the two decided to hire Kris, an expert in Angular development.

const founders$ = zip( client$, environment$ ).pipe( map([{idea, payments}, env]) => // todo // reject or accept // transform the idea based on the environment to // a proof of concept // estimate risks // return parts of the payments, prototype and estimation ) )

As you can see, the two founders Max and Simon have significantly reduced their workload and can focus on accounting, customer acquisition and business development. From now on, they no longer need to know the complete development steps down to the last detail. Because this is where Kris comes in.

const developers = ['Kris']; const speedOfDevelopment = developers.length / 1000;const developers$ = founders$.pipe( filter(({payments}) => payments > 0), map(({payments, prototype, estimation}) => // Todo // create subtasks // implement designs // implement application // return application ), delay(1 / speedOfDevelopment) )

He is responsible for software development and also for designs. As more and more projects are added, the founders decide to shorten the time needed to complete a project by hiring more developers. But hiring more and more developers was not enough.

const developers = ['Kris', 'Anna', 'Gustav', 'Yens', 'Maria', 'Lennard'];

So Max and Simon decided to look for project managers to manage projects, estimate the schedule, and break tasks into smaller subtasks. Kris also wanted designers to tackle the designs. What did you realize besides hiring developers? We decoupled our code from an observable into smaller pieces that are much easier to manage and less error-prone than the big code. And that's a core assumption of reactive software development: you divide larger parts of the program into smaller parts that act independently. This is exactly what we want to do with the project managers in our example.

Project manager:inside

The developers did a very good job, but they were a bit disorganized, and sometimes found themselves getting bogged down trying to find a perfect solution instead of focusing on the most important parts. This is where project managers came in. They are responsible for creating an overview of all projects and small tickets for each task. Also, if there are new developers subscribing to the project manager, they should receive the latest tickets.

The developer's implementation now needs to be reworked to get the tickets instead of the full application.

const projectLeads = ['Emma', 'Markus'];const projectLeads$ = founders$.pipe( scan(({ payments, prototype, estimation }) => // Todo // Append application to either Emma's or Markus' // application stack // Append prototype and estimation to the project // leads internal projects list // Check if project payment ressources are covered , {}), map(({ projects }) => // Todo // Create small tickets for the developers and // (later on) designers // Estimate a time for each ticket // Return tickets ), shareReplay(1) );
const developers$ = projectLeads$.pipe( map(ticket => // Todo // Implement designs // Implement application // Return application ) );

Designer:inside

At this point, the applications are technically top-notch, responsive, and use best practices and the latest technologies. Max and Simon are quite happy with the results, but there is still one important point missing. The applications do not look particularly good and, but like any other pure developer application. Mostly with a title bar, a page navigation, some maps and a table. So the project leaders decide to hire some designers to take the usability to the next level.

const designers = ['Agathe', 'Annabelle', 'David'];const designers$ = this.projectLead$.pipe( filter(ticket => ticket.isForDesigners), map(ticket => //Todo // Create design based on tickets // Return design ) );

Developers can now use these visually appealing and user-friendly designs to implement first-class applications.

const developers$ = this.projectLeads$.pipe( filter(ticket => ticket.isForDevelopers), withLatestFrom(this.designers$), map(([ticket, designs]) => // Todo // Implement features/application // Return application ) );

Discover the reactive playground here. You can see that when you trigger the process by clicking on the "New Environment", the entire stream is processed again. This covers a portion of the entire company. With all the new team members on board, internal management became much more time-consuming for the founders.

Therefore, two organizers had to be hired to complete the company. Their responsibilities include payroll, implementing marketing strategies, and managing vacation requests. So based on what you have learned so far, how would you implement const organizers = ['Lea', 'Karol', 'Ricarda']?

If you don't know the answer, don't hesitate to contact us for a proposed solution. We at interfacewerk develop high-end enterprise web applications and HMIs and consult on Angular, RxJS and UX topics. So don't hesitate to contact us if you have any questions about these topics.

Conclusion

One of my favorite points about using reactive patterns: independent application parts take the code to a new dimension of readability. Imagine if the founders had done everything themselves forever; that could have led to a complete mess. Defining multiple shared parts of the company and assigning tasks and responsibilities makes the company's work efficient and productive and ensures the quality required by the customer. For code, this means defining multiple parts for each observable area to keep the code clean. With RxJS and reactive programming, you achieve the same thing as Max and Simon by sharing responsibilities and decentralizing decision making. You are able to think in building blocks and respond to user actions, http responses, or whatever you want to hear. It allows the developer to make parts of the application completely independent. Remember, when we broke up the company box to make it more detailed, we didn't touch the rest of the process. This is one reason why refactoring and requesting additional functionality is much easier and faster to implement.

Many developers forget that there must be at least one subscriber to the (cold) observable for it to work. A common task is to use Angular's http.get(url) function, which returns an observable (a stream of the response). However, if there is no subscriber for it, the request is not processed at all. Subscription can take the form of using the Observables subscribe method or, in Angular, preferably using the built-in async pipe. In the article, we have seen the subscription process in detail. Moreover, we have seen the Observable implementation and how it can handle the chaining using the pipe function.

What's next?

As you have seen in the code snippets, there are plenty of operators that you can use to modify your observable. This is both a great strength and weakness of RxJS. With more than 100 operators, new developers are overwhelmed, confused, and don't know how to get started. One recommendation is to start with just a few and only start with nested observables (and switchMap, mergeMap, etc.) once you understand the normal ones. It is also not necessary to touch so-called window operators at the beginning. A complete list of all operators can be found on rxjs.dev and also a handy tool called decision tree is available. This decision tree recommends specific operators based on your description or situation.

As mentioned in the introduction, RxJS was implemented together with Angular at the beginning. Angular makes massive use of reactive patterns, so it is absolutely worth being familiar with reactive programming. Besides these benefits, there is also a big impact on performance when RxJS is used properly. You can avoid so-called change detection in many situations and make changes only when the observable outputs a new value. Especially when rendering huge lists on the DOM, RxJS is the most efficient way to deal with this.

Written by:
Felix