This summer we launched Workflows, a new product that allows fraud managers to build and update their fraud processes without needing to write any code. Along with Decisions, which lower the bar for implementing automatic fraud prevention, and Review Queues, which improve the experience around manual review of users and orders, Sift Science users can now quickly and easily respond to changing fraud conditions without involving engineers.
Because our users rely on these tools to make automated decisions about which orders should be shipped or blocked, which users to allow and which to ban—often in realtime—they must be accurate, fast, and always available. In this post, I’ll discuss how we built a high-performance distributed workflow runner using Kafka, ZooKeeper, and HBase.
What are workflows?
First, some background on Workflows. Previously, Sift users would call the Score API to determine whether a user or order should be banned or canceled. The score API returns a number from 0–1 indicating how likely a particular user is to be a fraudster, so developers would need to decide what to do with different score ranges (i.e., ban if score > 0.8). This kind of integration proved inflexible for the people that use Sift day-to-day, who typically come from a fraud or finance background rather than an engineering one.
With Workflows, we’ve tried to empower fraud managers to make the changes they need. They can use a UI in the Sift web console to define what should happen when specific events (like when a user completes an order) are sent to the Sift Events API. They can define criteria, like “fraud score is greater than 0.8” or “order amount is less than $50” based on event properties, Sift Science scores, and the many features we produce as part of our machine learning. And then can take actions based on those criteria, for example “take an automatic decision” and “send to a review queue.” The interface looks like this:
Once a workflow has been built and published, it will immediately begin running against events of the specified type.
Behind the scenes
Underlying this UI is a flowchart-like graph structure. We model the workflow as an acyclic directed graph that looks like this:
In this graph, the nodes (“event”, “review queue”, “approve order”, etc.) represent some sort of computation. These generally involve calling out to an external service (which we call an app) which will take in data, do some processing, external data collection, or user interaction, and finally output some data. Examples of apps include the review queue app, which provides a UI for fraud managers to take decisions on users and orders, and the Decision app, which sends a webhook when a decision has been made.
Each node also has an ordered set of edges, each of which has an associated criteria. When executing the workflow, we evaluate the criteria for each edge in order and traverse the first edge that evaluates to true. Each node additionally has a default edge that is traversed if no other edges match. These criteria can be defined on any data that was produced upstream of the edge. For example, an edge on the “scoring” node can depend on data from the original event or the scores and features output from our machine-learning system. Edges from the “review queue” node can additionally depend on the output from the review queue app (namely, the outcome of the review).
We place a few other restrictions on these graphs to ensure that they always end up in a decision state:
- They must be acyclic
- All nodes must be reachable from the root node
- All terminal nodes must be decisions (and decisions must all be terminal nodes)
The execution of workflows is managed by a component called the workflow runner. The
runner has a number of responsibilities:
- It allows clients to start workflow runs
- It evaluates criteria to determine the next node in the flow
- It schedules work by calling out to the appropriate app
- It receive updates from apps when they finish
- It times out apps that have taken too long
- It returns the status of runs back to clients
In other words, it manages the state machine of each workflow run:
How it works
When we designed the architecture for this system, we started with three prioritized goals:
For every event received for which there is a workflow defined, we should start exactly one workflow run which should eventually run to completion.
We should always be able to return the initial state of a workflow for a synchronous request.
Using the synchronous workflow API should add < 50ms latency at the 99th percentile over calling the score API directly.
These goals (along with our load requirements) led to a design that looks more like a distributed database than a typical web application.
To achieve our consistency guarantees at low latency, we used an in-memory SQL database on a single “leader” node to store the state of the system. This allowed us to do highly concurrent inserts, updates and queries at sub-ms latencies with strong consistency and take advantage of local caching. The “start workflow” call—the most latency sensitive pathway—only needed to update this in-memory DB and do a single write to HBase.
To make the system highly available, we actually run a cluster of workflow nodes made up of a single leader and one or more followers. The leader is chosen using ZooKeeper leader election. Only the leader is allowed to handle requests or make changes to the state (followers forward all requests to the leader). If the leader fails to send a heartbeat to zookeeper within its timeout, it loses the leader lock and one of the followers is promoted to leader.
High availability with replicated logs
But this leads to another problem: the in-memory database is local to the leader, so a newly-promoted leader would start up without any knowledge of the state of the system. The state is also not durable and could not be recovered in the case of failure. Fortunately, state machine replication is a well-studied problem. A common solution is to use a replicated log, a data structure that is append-only, provides strong ordering semantics, and is consistently replicated across multiple nodes. This provides fault-tolerance, along with a guarantee that all readers will consume exactly the same log entries in exactly the same order.
How do you build database replication on top of a log? By writing updates to the log instead of the database. When the leader wants to update some state, it writes an entry to the log describing exactly the update it would like to make. For example, “move run with id yxnkzjq1 to the running state.” All nodes consume this log and update their local states accordingly. So long as the update operation is deterministic the nodes will stay in sync.
There are some important subtleties here to ensure consistency. The most important is that the leader must follow exactly the same steps as the followers. This means means that it cannot directly apply updates to its local state; instead, it must write the update to the log, wait until it consumes that update off the log, then try to apply it and notify the writer thread whether this was successful. It must also not consider an update to have been committed until this process has finished. Without this step, the leader could update its local state out of order, or in ways that conflict with the other nodes. In other words, all writes must go through the consistency system.
To implement this, we used Kafka as our log service. We already used it as our main queueing layer and it met our requirements for durability and latency. To get the strong ordering guarantees we needed, we used a single partition for our replication topic. With some tuning we were able to achieve 99p round-trip latency under 10ms with thousands of requests per second.
Consistency in failover
Another tricky issue is managing leader failover. How do we ensure that only the leader is able to write to the log? This might at first seem like a simple problem. After all, the nodes can read the current leader from ZK, and should be able to tell whether they are it. But in a distributed system, we can never rely purely on local knowledge (“am I leader?”) to achieve consistency. For example, we may see a sequence of operations that look like this:
- Check we’re leader
- GC pause for 1 minute, losing leadership
- Write to the log (even though we’re no longer leader)
We prevent this by ignoring log entries written by non-leaders. ZooKeeper makes this easier by giving each node a monotonically-increasing id (the leader is the node with the smallest id). All writes to the log include the writer’s id. Log readers keep track of the largest id they’ve seen and ignore writes from anything with a smaller id, as those must be from an old leader.
In theory, a node could start up, read the entire log, and become up to date. But this is very inefficient. It requires us to store the entire history of the system in Kafka, and would take a very long time. Instead we use checkpoints. Periodically, we write out the current state of the system to a distributed filesystem. When a node starts up, it reads the latest checkpoint, finds the highest log index included there, and begins to read and apply the log from that point until it has caught up.
Workflows has now been running in production for 5 months and has served billions of requests. Through designing and building this product, we learned some interesting lessons:
- Starting with measurable goals was very effective. It directed our effort where it would make the most impact and provided a more objective criteria for evaluating design options.
- Sometimes use unconventional designs make the most sense, particularly when scale and latency matter. Practitioners shouldn’t be afraid of stepping into the distributed systems literature to solve problems.
- Prototypes can be very useful to validate designs. We started by building prototypes backed by Postgres and the system described here which validated that (1) we could build this system and (2) it would perform much better than Postgres for our use case.
If you’re interested in working on interesting distributed systems problems like these, Sift is hiring!