Ben J. Christensen

Application Resilience in a Service-Oriented Architecture using Hystrix

Originally written for programming.oreilly.com:

Web-scale applications such as Netflix serve millions of customers using thousands of servers across multiple data centers. Unmitigated system failures impact user experience, product image, company brand and potentially even revenue. Service-oriented architectures such as these are too complex to completely understand or control and must be treated accordingly. The relationships between nodes are constantly changing as actors within the system independently evolve. Failure in the form of errors and latency will emerge from these relationships and resilient systems can easily “drift” into states of vulnerability. Infrastructure alone cannot be relied upon to achieve resilience. Application instances, as components of a complex system, must isolate failure and constantly audit for change.

At Netflix, we have spent a lot of time and energy engineering resilience into our systems. Among the tools we have built is Hystrix which specifically focuses on failure isolation and graceful degradation. It evolved from a series of production incidents involving saturated connection and/or thread pools, cascading failures, and misconfigurations of pools, queues, timeouts and other such “minor mistakes” which led to major user impact.

This open source library follows these principles in protecting our systems when novel failures inevitably occur:

  • Isolate client network interaction using the bulkhead and circuit breaker patterns.
  • Fallback and degrade gracefully when possible.
  • Fail fast when fallbacks aren’t available and rapidly recover.
  • Monitor, alert and push configuration changes with low latency (seconds).

Restricting concurrent access to a given backend service has proven to be an effective form of bulkheading as it limits the resource utilization to a concurrent request limit smaller than the total resources available in an application instance. We do this using two techniques, thread pools and semaphores. Both provide the essential quality of restricting concurrent access while threads provide the added benefit of timeouts so the caller can “walk away” if the underlying work is latent.

Isolating functionality rather than the transport layer is valuable as it not only extends the bulkhead beyond network failures and latency, but also those caused by client code. Examples include request validation logic, conditional routing to different or multiple backends, request serialization, response deserialization, response validation, and decoration. Network responses can be latent, corrupted or incompatibly changed at any time which in turn can result in unexpected failures in this application logic.

Mixed environments will also have several types of clients for the many different types of backends. Each has different configurations and most clients don’t expose themselves easily for auditing or modification in a production environment. Unfortunately it is also generally true that default configurations are not optimal and despite best efforts these leak into a system (particularly via transitive dependencies) and it only takes one misconfigured client to expose a vulnerability that results in a system outage.

Bulkheading around all of this – transport layer, network clients and client code – permits reliable protection against changing behavior, misconfigurations, transitive dependencies performing unexpected network activity, response handling failures and overall latency regardless of where it comes from.

Applying bulkheads at the functional level also enables addition of business logic for fallback behavior to allow graceful degradation when failure occurs. Failure may come via network or client code exceptions, timeouts, short-circuiting, or concurrent request throttling. All of them, however, can now be handled with the same “failure handler” to provide fallback responses. Some functionality may not be able to gracefully degrade so will “fail fast” and shed load until recovery, but many others can return stale data, use secondary systems, use defaults or other such patterns.

Operations and insight into what is going on is equally important to the actual isolation techniques. The key aspects of this are low latency metrics, low latency configuration changes and common insight into all service relationships regardless of how the network transport is being implemented.

A low-latency (~1 second) metrics stream is exposed to aggregate metrics from all application instances in a cluster. We use this stream for alerting and dashboards (as shown in a video clip below and the annotated image above) to provide visualizations of traffic, performance and health of all bulkheads on a system. Near real-time metrics have improved mean-time-to-detection (MTTD) and mean-time-to-recovery (MTTR) and increased operational effectiveness when doing deployments or dealing with production incidents. Configuration changes visually roll across a cluster of servers in seconds and the impact is seen immediately.

Auditing production is essential to maintaining a resilient system. Latency Monkey is used in production to inject latency into the system relationships of the service-oriented architecture. Latency can be far more damaging to a distributed system and is more difficult to address than “fast fail” of machines or code. While running latency simulations Hystrix real-time monitoring allows us to rapidly see the impact, determine if we are safe or need to end the test. Most times these simulations “light up” a Hystrix bulkhead on our dashboards to show they are doing their job to isolate the latency but sometimes we reveal a regression and quickly discover it, end the test and pursue a resolution which is validated in the next test run.

Another form of auditing being applied is tracking all network traffic leaving the JVM and finding those not isolated behind a bulkhead. We use this like a “canary in a coalmine” that permanently runs and takes a small percentage of production traffic to find network traffic that springs up without isolation. This can occur in the initial canary deployment with new code, or it may occur when unexpected code paths are enabled via transitive dependencies when AB tests are turned on or production configuration is changed and pushed out to a fleet of existing servers.

Graceful degradation is not purely a server-side consideration and our device and UI teams play an equally important role in making the user experience robust and capable of degrading gracefully. For example, the server can use bulkheading to isolate failure and choose to “fail silently” on a portion of a request considered optional but UIs must behave correctly or we may cause the client to fail as it tries to render data not present in a response. Fault injection via Hystrix execution hooks enables device teams to target specific UIs, devices and accounts to test failure, latency and fallback scenarios and determine whether the client code responds as it should.

Engineering resilience into an application is critical to achieving fault and latency tolerance. Operational considerations and support by client applications are equally important. These principles can be applied in many different ways and approaches will differ by language, technology stack and personal preference but hopefully our experiences, and perhaps even our open source software, can inspire improved resilience in your systems.

Filed under: Architecture, Code, Infrastructure, Performance, Production, Production Problems, Resilience Engineering, Tools

Functional Reactive Programming in the Netflix API – QCon London 2013

I had the opportunity to speak at QCon London 2013 and present Functional Reactive Programming in the Netflix API:

Filed under: Architecture, Code, Infrastructure, Performance, Production, Resilience Engineering

Functional Reactive in the Netflix API with RxJava

Originally written for and posted on the Netflix Tech Blog.

by Ben Christensen and Jafar Husain

Our recent post on optimizing the Netflix API introduced how our web service endpoints are implemented using a “functional reactive programming” (FRP) model for composition of asynchronous callbacks from our service layer.

This post takes a closer look at how and why we use the FRP model and introduces our open source project RxJava – a Java implementation of Rx (Reactive Extensions).

Embrace Concurrency

Server-side concurrency is needed to effectively reduce network chattiness. Without concurrent execution on the server, a single “heavy” client request might not be much better than many “light” requests because each network request from a device naturally executes in parallel with other network requests. If the server-side execution of a collapsed “heavy” request does not achieve a similar level of parallel execution it may be slower than the multiple “light” requests even accounting for saved network latency.

Futures are Expensive to Compose

Futures are straight-forward to use for a single level of asynchronous execution but they start to add non-trivial complexity when they’re nested.

Conditional asynchronous execution flows become difficult to optimally compose (particularly as latencies of each request vary at runtime) using Futures. It can be done of course, but it quickly becomes complicated (and thus error prone) or prematurely blocks on ‘Future.get()’, eliminating the benefit of asynchronous execution.

Callbacks Have Their Own Problems

Callbacks offer a solution to the tendency to block on Future.get() by not allowing anything to block. They are naturally efficient because they execute when the response is ready.

Similar to Futures though, they are easy to use with a single level of asynchronous execution but become unwieldy with nested composition.

Reactive

Functional reactive offers efficient execution and composition by providing a collection of operators capable of filtering, selecting, transforming, combining and composing Observable’s.

The Observable data type can be thought of as a “push” equivalent to Iterable which is “pull”. With an Iterable, the consumer pulls values from the producer and the thread blocks until those values arrive. By contrast with the Observable type, the producer pushes values to the consumer whenever values are available. This approach is more flexible, because values can arrive synchronously or asynchronously.

The Observable type adds two missing semantics to the Gang of Four’s Observer pattern, which are available in the Iterable type:

  1. The ability for the producer to signal to the consumer that there is no more data available.
  2. The ability for the producer to signal to the consumer that an error has occurred.

With these two simple additions, we have unified the Iterable and Observable types. The only difference between them is the direction in which the data flows. This is very important because now any operation we perform on an Iterable, can also be performed on an Observable. Let’s take a look at an example …

Observable Service Layer

The Netflix API takes advantage of Rx by making the entire service layer asynchronous (or at least appear so) – all “service” methods return an Observable<T>.

Making all return types Observable combined with a functional programming model frees up the service layer implementation to safely use concurrency. It also enables the service layer implementation to:

  • conditionally return immediately from a cache
  • block instead of using threads if resources are constrained
  • use multiple threads
  • use non-blocking IO
  • migrate an underlying implementation from network based to in-memory cache

This can all happen without ever changing how client code interacts with or composes responses.

In short, client code treats all interactions with the API as asynchronous but the implementation chooses if something is blocking or non-blocking.

This next example code demonstrates how a service layer method can choose whether to synchronously return data from an in-memory cache or asynchronously retrieve data from a remote service and callback with the data once retrieved. In both cases the client code consumes it the same way.

Retaining this level of control in the service layer is a major architectural advantage particularly for maintaining and optimizing functionality over time. Many different endpoint implementations can be coded against an Observable API and they work efficiently and correctly with the current thread or one or more worker threads backing their execution.

The following code demonstrates the consumption of an Observable API with a common Netflix use case – a grid of movies:

That code is declarative and lazy as well as functionally “pure” in that no mutation of state is occurring that would cause thread-safety issues.

The API Service Layer is now free to change the behavior of the methods ‘getListOfLists’, ‘getVideos’, ‘getMetadata’, ‘getBookmark’ and ‘getRating’ – some blocking others non-blocking but all consumed the same way.

In the example, ‘getListOfLists’ pushes each ‘VideoList’ object via ‘onNext()’ and then ‘getVideos()’ operates on that same parent thread. The implementation of that method could however change from blocking to non-blocking and the code would not need to change.

RxJava

RxJava is our implementation of Rx for the JVM and is available in the Netflix repository in Github.

It is not yet feature complete with the .Net version of Rx, but what is implemented has been in use for the past year in production within the Netflix API.

We are open sourcing the code as version 0.5 as a way to acknowledgement that it’s not yet feature complete. The outstanding work is logged in the RxJava Issues.

Documentation is available on the RxJava Wiki including links to material available on the internet.

Some of the goals of RxJava are:

  • Stay close to the original Rx.Net implementation while adjusting naming conventions and idioms to Java
  • All contracts of Rx should be the same
  • Target the JVM not a language. The first languages supported (beyond Java itself) are Groovy, Clojure, Scala and JRuby. New language adapters can be contributed.
  • Support Java 5 (to include Android support) and higher with an eventual goal to target a build for Java 8 with its lambda support.

Here is an implementation of one of the examples above but using Clojure instead of Groovy:

Summary

Functional reactive programming with RxJava has enabled Netflix developers to leverage server-side conconcurrency without the typical thread-safety and synchronization concerns. The API service layer implementation has control over concurrency primitives, which enables us to pursue system performance improvements without fear of breaking client code.

RxJava is effective on the server for us and it spreads deeper into our code the more we use it.

We hope you find the RxJava project as useful as we have and look forward to your contributions.

Filed under: Architecture, Code, Performance, Production

Optimizing the Netflix API

Originally written for and posted on the Netflix Tech Blog:

About a year ago the Netflix API team began redesigning the API to improve performance and enable UI engineering teams within Netflix to optimize client applications for specific devices. Philosophies of the redesign were introduced in a previous post about embracing the differences between the different clients and devices.

This post is part one of a series on the architecture of our redesigned API.

Goals

We had multiple goals in creating this system, as follows:

Reduce Chattiness

One of the key drivers in pursuing the redesign in the first place was to reduce the chatty nature of our client/server communication, which could be hindering the overall performance of our device implementations.

Due to the generic and granular nature of the original REST-based Netflix API, each call returns only a portion of functionality for a given user experience, requiring client applications to make multiple calls that need to be assembled in order to render a single user experience. This interaction model is illustrated in the following diagram:

To reduce the chattiness inherent in the REST API, the discrete requests in the diagram above should be collapsed into a single request optimized for a given client. The benefit is that the device then pays the price of WAN latency once and leverages the low latency and more powerful hardware server-side. As a side effect, this also eliminates redundancies that occur for every incoming request.

A single optimized request such as this must embrace server-side parallelism to at least the same level as previously achieved through multiple network requests from the client. Because the server-side parallelized requests are running in the same network, each one should be more performant than if it was executed from the device. This must be achieved without each engineer implementing an endpoint needing to become an expert in low-level threading, synchronization, thread-safety, concurrent data structures, non-blocking IO and other such concerns.

Distribute API Development

A single team should not become a bottleneck nor need to have expertise on every client application to create optimized endpoints. Rapid innovation through fast, decoupled development cycles across a wide variety of device types and distributed ownership and expertise across teams should be enabled. Each client application team should be capable of implementing and operating their own endpoints and the corresponding requests/responses.

Mitigate Deployment Risks

The Netflix API is a Java application running on hundreds of servers processing 2+ billion incoming requests a day for millions of customers around the world. The system must mitigate risks inherent in enabling rapid and frequent deployment by multiple teams with minimal coordination.

Support Multiple Languages

Engineers implementing endpoints come from a wide variety of backgrounds with expertise including Javascript, Objective-C, Java, C, C#, Ruby, Python and others. The system should be able to support multiple languages at the same time.

Distribute Operations

Each client team will now manage the deployment lifecycle of their own web service endpoints. Operational tools for monitoring, debugging, testing, canarying and rolling out code must be exposed to a distributed set of teams so teams can operate independently.

Architecture

To achieve the goals above our architecture distilled into a few key points:

  • dynamic polyglot runtime
  • fully asynchronous service layer
  • functional reactive programming model

The following diagram and subsequent annotations explain the architecture:


[1] Dynamic Endpoints

All new web service endpoints are now dynamically defined at runtime. New endpoints can be developed, tested, canaried and deployed by each client team without coordination (unless they depend on new functionality from the underlying API Service Layer shown at item 5 in which case they would need to wait until after those changes are deployed before pushing their endpoint).

[2] Endpoint Code Repository and Management

Endpoint code is published to a Cassandra multi-region cluster (globally replicated) via a RESTful Endpoint Management API used by client teams to manage their endpoints.

[3] Dynamic Polyglot JVM Language Runtime

Any JVM language can be supported so each team can use the language best suited to them.

The Groovy JVM language was chosen as our first supported language. The existence of first-class functions (closures), list/dictionary syntax, performance and debuggability were all aspects of our decision. Moreover, Groovy provides syntax comfortable to a wide range of developers, which helps to reduce the learning curve for the first language on the platform.

[4 & 5] Asynchronous Java API + Functional Reactive Programming Model

Embracing concurrency was a key requirement to achieve performance gains but abstracting away thread-safety and parallel execution implementation details from the client developers was equally important in reducing complexity and speeding up their rate of innovation. Making the Java API fully asynchronous was the first step as it allows the underlying method implementations to control whether something is executed concurrently or not without the client code changing. We chose a functional reactive approach to handling composition and conditional flows of asynchronous callbacks. Our implementation is modeled after Rx Observables.

[6] Hystrix Fault Tolerance

As we have described in a previous post, all service calls to backend systems are made via the Hystrix fault tolerance layer (which was recently open sourced, along with its dashboard) that isolates the dynamic endpoints and the API Service Layer from the inevitable failures that occur while executing billions of network calls each day from the API to backend systems.

The Hystrix layer is inherently mutlti-threaded due to its use of threads for isolating dependencies and thus is leveraged for concurrent execution of blocking calls to backend systems. These asynchronous requests are then composed together via the functional reactive framework.

[7] Backend Services and Dependencies

The API Service Layer abstracts away all backend services and dependencies behind facades. As a result, endpoint code accesses “functionality” rather than a “system”. This allows us to change underlying implementations and architecture with no or limited impact on the code that depends on the API. For example, if a backend system is split into 2 different services, or 3 are combined into one, or a remote network call is optimized into an in-memory cache, none of these changes should affect endpoint code and thus the API Service Layer ensures that object models and other such tight-couplings are abstracted and not allowed to “leak” into the endpoint code.

Summary

The new Netflix API architecture is a significant departure from our previous generic RESTful API.

Dynamic JVM languages combined with an asynchronous Java API and the functional reactive programming model have proven to be a powerful combination to enable safe and efficient development of highly concurrent code.

The end result is a fault-tolerant, performant platform that puts control in the hands of those who know their target applications the best.

Following posts will provide further implementation and operational details about this new architecture.

Filed under: Architecture, Code, Infrastructure, Performance, Production, Resilience Engineering

Hystrix Dashboard + Turbine Stream Aggregator

Originally posted to the Netflix Tech Blog:

by Ben Christensen, Puneet Oberai and Ben Schmaus

Two weeks ago we introduced Hystrix, a library for engineering resilience into distributed systems. Today we’re open sourcing the Hystrix dashboard application, as well as a new companion project called Turbine that provides low latency event stream aggregation.


The Hystrix dashboard has significantly improved our operations by reducing discovery and recovery times during operational events. The duration of most production incidents (already less frequent due to Hystrix) is far shorter, with diminished impact, because we are now able to get realtime insights (1-2 second latency) into system behavior.

The following snapshot shows six HystrixCommands being used by the Netflix API. Under the hood of this example dashboard, Turbine is aggregating data from 581 servers into a single stream of metrics supporting the dashboard application, which in turn streams the aggregated data to the browser for display in the UI.

When a circuit is failing then it changes colors (gradient from green through yellow, orange and red) such as this:

The diagram below shows one “circuit” from the dashboard along with explanations of what all of the data represents.

We’ve purposefully tried to pack a lot of information into the dashboard so that engineers can quickly consume and correlate data.

The following video shows the dashboard operating with data from a Netflix API cluster:

The Turbine deployment at Netflix connects to thousands of Hystrix-enabled servers and aggregates realtime streams from them. Netflix uses Turbine with a Eureka plugin that handles instances joining and leaving clusters (due to autoscaling, red/black deployments, or just being unhealthy).

Our alerting systems have also started migrating to Turbine-powered metrics streams so that in one minute of data there are dozens or hundreds of points of data for a single metric. This high resolution of metrics data makes for better and faster alerting.

The Hystrix dashboard can be used either to monitor an individual instance without Turbine or in conjunction with Turbine to monitor multi-machine clusters:

Turbine can be found on Github at:
https://github.com/Netflix/Turbine

Dashboard documentation is at:
https://github.com/Netflix/Hystrix/wiki/Dashboard

We expect people to want to customize the UI so the javascript modules have been implemented in a way that they can easily be used standalone in existing dashboards and applications. We also expect different perspectives on how to visualize and represent data and look forward to contributions back to both Hystrix and Turbine.

We are always looking for talented engineers so if you’re interested in this type of work contact us via jobs.netflix.com.

Filed under: Architecture, Code, Infrastructure, Performance, Production, Resilience Engineering, Tools

Performance and Fault Tolerance for the Netflix API – QCon Sao Paulo

A presentation I gave at QCon Sao Paulo on August 4th 2012 (http://qconsp.com/palestrante/ben-christensen)

Presentation Description

The Netflix API receives over a billion requests a day which translates into multiple billions of calls to underlying systems in the Netflix service-oriented architecture. These requests come from more than 800 different devices ranging from gaming consoles like the PS3, XBox and Wii to set-top boxes, TVs and mobile devices such as Android and iOS.

This presentation describes how the Netflix API supports those devices and achieves fault tolerance in a distributed architecture while depending on dozens of systems which can fail at any time. It also explains how a new system design allows each device to optimize API calls to their unique needs and leverage concurrency on the server-side to improve their performance.

(Some slides have been modified and notes included for readability and understanding of content without accompanying speech.)

Slides also available at SpeakerDeck.

Conference center on Google Maps

Filed under: Architecture, Code, Performance, Production

Fault Tolerance in a High Volume, Distributed System

Originally posted on the Netflix Tech Blog:


In an earlier post by Ben Schmaus, we shared the principles behind our circuit-breaker implementation. In that post, Ben discusses how the Netflix API interacts with dozens of systems in our service-oriented architecture, which makes the API inherently more vulnerable to any system failures or latencies underneath it in the stack. The rest of this post provides a more technical deep-dive into how our API and other systems isolate failure, shed load and remain resilient to failures.

Fault Tolerance is a Requirement, Not a Feature

The Netflix API receives more than 1 billion incoming calls per day which in turn fans out to several billion outgoing calls (averaging a ratio of 1:6) to dozens of underlying subsystems with peaks of over 100k dependency requests per second.

This all occurs in the cloud across thousands of EC2 instances.

Intermittent failure is guaranteed with this many variables, even if every dependency itself has excellent availability and uptime.

Without taking steps to ensure fault tolerance, 30 dependencies each with 99.99% uptime would result in 2+ hours downtime/month (99.99%30 = 99.7% uptime = 2+ hours in a month).

When a single API dependency fails at high volume with increased latency (causing blocked request threads) it can rapidly (seconds or sub-second) saturate all available Tomcat (or other container such as Jetty) request threads and take down the entire API.

Thus, it is a requirement of high volume, high availability applications to build fault tolerance into their architecture and not expect infrastructure to solve it for them.

Netflix DependencyCommand Implementation

The service-oriented architecture at Netflix allows each team freedom to choose the best transport protocols and formats (XML, JSON, Thrift, Protocol Buffers, etc) for their needs so these approaches may vary across services.

In most cases the team providing a service also distributes a Java client library.

Because of this, applications such as API in effect treat the underlying dependencies as 3rd party client libraries whose implementations are “black boxes”. This in turn affects how fault tolerance is achieved.

In light of the above architectural considerations we chose to implement a solution that uses a combination of fault tolerance approaches:

  • network timeouts and retries
  • separate threads on per-dependency thread pools
  • semaphores (via a tryAcquire, not a blocking call)
  • circuit breakers

Each of these approaches to fault-tolerance has pros and cons but when combined together provide a comprehensive protective barrier between user requests and underlying dependencies.

The Netflix DependencyCommand implementation wraps a network-bound dependency call with a preference towards executing in a separate thread and defines fallback logic which gets executed (step 8 in flow chart below) for any failure or rejection (steps 3, 4, 5a, 6b below) regardless of which type of fault tolerance (network or thread timeout, thread pool or semaphore rejection, circuit breaker) triggered it.

We decided that the benefits of isolating dependency calls into separate threads outweighs the drawbacks (in most cases). Also, since the API is progressively moving towards increased concurrency it was a win-win to achieve both fault tolerance and performance gains through concurrency with the same solution. In other words, the overhead of separate threads is being turned into a positive in many use cases by leveraging the concurrency to execute calls in parallel and speed up delivery of the Netflix experience to users.

Thus, most dependency calls now route through a separate thread-pool as the following diagram illustrates:

If a dependency becomes latent (the worst-case type of failure for a subsystem) it can saturate all of the threads in its own thread pool, but Tomcat request threads will timeout or be rejected immediately rather than blocking.

In addition to the isolation benefits and concurrent execution of dependency calls we have also leveraged the separate threads to enable request collapsing (automatic batching) to increase overall efficiency and reduce user request latencies.

Semaphores are used instead of threads for dependency executions known to not perform network calls (such as those only doing in-memory cache lookups) since the overhead of a separate thread is too high for these types of operations.

We also use semaphores to protect against non-trusted fallbacks. Each DependencyCommand is able to define a fallback function (discussed more below) which is performed on the calling user thread and should not perform network calls. Instead of trusting that all implementations will correctly abide to this contract, it too is protected by a semaphore so that if an implementation is done that involves a network call and becomes latent, the fallback itself won’t be able to take down the entire app as it will be limited in how many threads it will be able to block.

Despite the use of separate threads with timeouts, we continue to aggressively set timeouts and retries at the network level (through interaction with client library owners, monitoring, audits etc).

The timeouts at the DependencyCommand threading level are the first line of defense regardless of how the underlying dependency client is configured or behaving but the network timeouts are still important otherwise highly latent network calls could fill the dependency thread-pool indefinitely.

The tripping of circuits kicks in when a DependencyCommand has passed a certain threshold of error (such as 50% error rate in a 10 second period) and will then reject all requests until health checks succeed.

This is used primarily to release the pressure on underlying systems (i.e. shed load) when they are having issues and reduce the user request latency by failing fast (or returning a fallback) when we know it is likely to fail instead of making every user request wait for the timeout to occur.

How do we respond to a user request when failure occurs?

In each of the options described above a timeout, thread-pool or semaphore rejection, or short-circuit will result in a request not retrieving the optimal response for our customers.

An immediate failure (“fail fast”) throws an exception which causes the app to shed load until the dependency returns to health. This is preferable to requests “piling up” as it keeps Tomcat request threads available to serve requests from healthy dependencies and enables rapid recovery once failed dependencies recover.

However, there are often several preferable options for providing responses in a “fallback mode” to reduce impact of failure on users. Regardless of what causes a failure and how it is intercepted (timeout, rejection, short-circuited etc) the request will always pass through the fallback logic (step 8 in flow chart above) before returning to the user to give a DependencyCommand the opportunity to do something other than “fail fast”.

Some approaches to fallbacks we use are, in order of their impact on the user experience:

  • Cache: Retrieve data from local or remote caches if the realtime dependency is unavailable, even if the data ends up being stale
  • Eventual Consistency: Queue writes (such as in SQS) to be persisted once the dependency is available again
  • Stubbed Data: Revert to default values when personalized options can’t be retrieved
  • Empty Response (“Fail Silent”): Return a null or empty list which UIs can then ignore

All of this work is to maintain maximum uptime for our users while maintaining the maximum number of features for them to enjoy the richest Netflix experience possible. As a result, our goal is to have the fallbacks deliver responses as close to what the actual dependency would deliver.

Example Use Case

Following is an example of how threads, network timeouts and retries combine:

The above diagram shows an example configuration where the dependency has no reason to hit the 99.5th percentile and thus cuts it short at the network timeout layer and immediately retries with the expectation to get median latency most of the time, and accomplish this all within the 300ms thread timeout.

If the dependency has legitimate reasons to sometimes hit the 99.5th percentile (i.e. cache miss with lazy generation) then the network timeout will be set higher than it, such as at 325ms with 0 or 1 retries and the thread timeout set higher (350ms+).

The threadpool is sized at 10 to handle a burst of 99th percentile requests, but when everything is healthy this threadpool will typically only have 1 or 2 threads active at any given time to serve mostly 40ms median calls.

When configured correctly a timeout at the DependencyCommand layer should be rare, but the protection is there in case something other than network latency affects the time, or the combination of connect+read+retry+connect+read in a worst case scenario still exceeds the configured overall timeout.

The aggressiveness of configurations and tradeoffs in each direction are different for each dependency.

Configurations can be changed in realtime as needed as performance characteristics change or when problems are found all without risking the taking down of the entire app if problems or misconfigurations occur.

Conclusion

The approaches discussed in this post have had a dramatic effect on our ability to tolerate and be resilient to system, infrastructure and application level failures without impacting (or limiting impact to) user experience.

Despite the success of this new DependencyCommand resiliency system over the past 8 months, there is still a lot for us to do in improving our fault tolerance strategies and performance, especially as we continue to add functionality, devices, customers and international markets.

Filed under: Architecture, Code, Infrastructure, Performance, Production, Production Problems, , , , ,

WebService Performance on EC2 Instances

As part of a cost/performance analysis of Amazon EC2 instances I wrote a very simple Java webapp to allow simple comparison benchmarking.

The webapp runs in Tomcat and has a servlet which returns a JSON response after looping to generate the JSON to cause CPU computation time as if the service were doing real work. It results in a lot of iteration and string creation activity – things web services do a lot of.

ApacheBenchmark (ab) was used as the test client to simulate traffic and capture statistics.

The instance types tested were m2.2xlargem2.4xlargecc1.4xlarge, and cc2.8xlarge.

The tests determined that the cc2.8xlarge instance type – though the most expensive by unit – is potentially the most cost effective type to use when serving large volume traffic involving a cluster of servers.

The cost to serve 1000 requests/second at a response time of approximately 18ms for each instance type is:

  • m2.2xlarge: $1.98
  • m2.4xlarge: $2.48
  • cc1.4xlarge: $1.36
  • cc2.8xlarge: $1.19

Of course, this test is very simplistic and ignores a wide variety of variables, but it demonstrates that the cc1 and cc2 boxes are well worth the time to evaluate for production application usage for achieving cost/performance efficiency.

The following graphs show the test results that were used to calculate the above costs.

Results per Second with Increasing Concurrency

This demonstrates how the 32 CPU cores on the cc2.8xlarge enable significantly higher throughput.

Time per Request (ms) with Increasing Concurrency

This shows how the response time degrades with increasing thread counts and how the larger boxes (particularly the cc2) scale better.

Spreadsheet

This screenshot of the spreadsheet shows the cost calculations at each level of concurrency.

The hi-lighted green is considered the optimal “ceiling” beyond which performance degrades and throughput plateaus. This point is considered the high-water-mark that can be used to determine the number of machines in a cluster needed to serve a given amount of traffic and thus allow comparison of different machines.

Price per 1000 Requests per Second with Increasing Concurrency

This scatter chart shows the cost to serve 1000 requests per second at increasing levels of concurrency on each instance type.

Overall, the performance of the cc1 and cc2 boxes are impressive and their cost/performance appears to be far better than the m2 instances.

The raw results of the tests can be found on Github.

Filed under: Infrastructure, Performance, ,

AtomicCircularArray Wins My Concurrency Throughput Test

I recently made several implementations of a RollingNumber (allowing a sum to be calculated over a 10 second period with 1 second increments with continual updates) to determine what would perform best under high-concurrency.

In my case, concurrency means 4-8000 writes/second from hundreds of threads on an 8-core machine and only a handful of reads per second.

In the end, a circular-array (using AtomicReferenceArray internally) won my throughput tests and also achieved my side goal of being non-blocking.

The code is on GitHub and the winning implementation is RollingNumberViaTryLockWithAtomicCircularArray.

Note that this code is NOT what I would deploy in production, since I forced each implementation to comply to the same interface so they could all be run by my test-harness.

However, a modified version of RollingNumberViaTryLockWithAtomicCircularArray is being run in production processing billions of writes per day.

Another aspect of the test is the use of ReentrantLock.tryLock() instead of a synchronized block. The tryLock() works well in this use-case, allowing only 1 thread to get the lock and routing all others around it instead of blocking them.

Test Results

Here are charts (higher is better) showing the differences in performance between implementations on two different machines:

MacBook Pro 2.2GHz Intel Core i7, 8GB Memory, SSD Drive, OSX Lion 10.7.1

$ uname -a
Darwin 11.1.0 Darwin Kernel Version 11.1.0: Tue Jul 26 16:07:11 PDT 2011; root:xnu-1699.22.81~1/RELEASE_X86_64 x86_64

$ java -version
java version "1.6.0_26"
Java(TM) SE Runtime Environment (build 1.6.0_26-b03-383-11A511)
Java HotSpot(TM) 64-Bit Server VM (build 20.1-b02-383, mixed mode)

The winner is the blue bar which is the highest.

Amazon EC2 m2.4xlarge
High-Memory Quadruple Extra Large Instance 68.4 GB of memory, 26 EC2 Compute Units (8 virtual cores with 3.25 EC2 Compute Units each), 1690 GB of local instance storage, 64-bit platform

$ uname -a
Linux 2.6.21.7-2.ec2.v1.3.fc8xen #1 SMP Sat Sep 25 01:16:50 EDT 2010 x86_64 x86_64 x86_64 GNU/Linux

$ /usr/java/latest/bin/java -version
java version "1.6.0_25"
Java(TM) SE Runtime Environment (build 1.6.0_25-b06)
Java HotSpot(TM) 64-Bit Server VM (build 20.0-b11, mixed mode)

The winner is the orange bar which is the highest.

Running the Test

You can run the test using an executable JAR with the command:

java -Xmx1g -jar RollingNumberThroughputTest.jar

The test also validates that the code does what it should and is able to find non-thread-safe implementations by looking for incorrect math caused by concurrency bugs.

Conclusion

I found it very interesting how differently the results were on the 2 machines (operating systems and CPUs are very different, the JVM implementations are also different). In both cases though the AtomicCircularArray performed better, far better on the EC2 instance running the Sun JVM.

These tests provided me with the data to choose AtomicCircularArray and as mentioned above it is similar to what I’m now using in production.

Filed under: Code, Performance

Statistics for Why Web Performance Matters

Following are some good posts with further evidence of the impact of webpage/webapp responsiveness to the user experience, amount of usage and ultimately revenue.

Good introductory quote to the external links and images shown below:

There’s no longer any debate. There’s reliable, reproducible evidence that web page latency is directly tied to the bottom line. At Velocity, Microsoft, Google and Shopzilla made this abundantly clear in a series of awesome presentations: detailed, controlled testing proves that slower pages hurt the bottom line. In Google’s case, adding delay reduces the average number of searches a visitor does each day even after the delay is removed.

Regarding “smaller scale” sites more typical than Google and Bing:

The results of their analysis show how significant a reduction in page latency can be. In addition to reducing bounce rates, and increasing pages per visit & time on site, they found a 16.07% increase in conversion rates and a 5.50% increase in average order value.
http://radar.oreilly.com/2009/10/watching-websites.html

External Links:


http://radar.oreilly.com/2009/10/watching-websites.html


http://radar.oreilly.com/2009/07/velocity-making-your-site-fast.html


http://www.watchingwebsites.com/archives/proof-that-speeding-up-websites-improves-online-business

Good summary PDF showing metrics of performance impact.

conversion-rate-and-order-value

bing-delayimpact

Filed under: Performance, Production, User Interface

Twitter Updates

View Ben Christensen's profile on LinkedIn
Follow

Get every new post delivered to your Inbox.