Tag Archives: Elastic search

Powering Transactions Search with Elastic – Learnings from the Field

By

Introduction

We see a lot of transactions at PayPal. Millions every day.

These transactions originate externally (a customer using PayPal to pay for a purchase on a website) as well as internally, as money moves through our system. Regular reports of these transactions are delivered to merchants in the form of a csv or a pdf file. Merchants use these reports to reconcile their books.

Recently, we set out to build a REST API that could return transaction data back to merchants. We also wanted to offer the capability to filter on different criteria such as name, email or transaction amount. Support for light aggregation/insight use cases was a stretch goal. This API would be used by our partners, merchants and external developers.

We choose Elastic as the data store. Elastic has proven, over the past 6 years, to be an actively developed product that constantly evolves to adapt to user needs. With a great set of core improvements introduced in version 2.x (memory mapped doc values, auto-regulated merge throttling), we didn’t need to look any further.

Discussed below is our journey and key learnings we had along the way, in setting up and using Elastic for this project.

Will It Tip Over

Once live, the system would have tens of terabytes of data spanning 40+ billion documents. Each document would have over a hundred attributes. There would be tens of millions of documents added every day. Each one of the planned Elastic blades has 20TB of SSD storage, 256 GB RAM and 48 cores (hyper).

While we knew Elastic had all the great features we were looking for, we were not too sure if it would be able to scale to work with this volume (and velocity) of data. There are a host of non-functional requirements that arise naturally in financial systems which have to be met. Let’s limit our discussion in this post to performance – response time to be specific.

Importance of Schema

Getting the fundamentals right is the key.

When we initially setup Elastic, we turned on strict validation of fields in the documents. While this gave us a feeling of security akin to what we’re used to with relational systems (strict field and type checks), it hurt performance.

We inspected the content of the Lucene index Elastic created using Luke. With our initial index setup, Elastic was creating sub-optimal indexes. For e.g. in places where we had defined nested arrays (marked index=”no”), Elastic was still creating child hidden documents in Lucene, one per element in the array. This document explains why, but it was still using up space when we can’t even query the property via the index. Due to this, we switched the “dynamic” index setting from strict to false. Avro helped ensure that the document conforms to a valid schema, when we prepared the documents for ingestion.

A shard should have no more than 2 billion parent plus nested child documents, if you plan to run force merge on it eventually (Lucene doc_id is an integer). This can seem high but is surprisingly easy to exceed, especially when de-normalizing high cardinality data into the source. An incorrectly configured index could have a large number of hidden Lucene documents being created under the covers.

Too Many Things to Measure

With the index schema in place, we needed a test harness to measure the performance of the cluster. We wanted to measure Elastic performance under different load conditions, configurations and query patterns. Taken together, the dimensions total more than 150 test scenarios. Sampling each by hand would be near impossible. jMeter and Beanshell scripting really helped here to auto-generate scenarios from code and have jMeter sample each hundreds of times. The results are then fed into Tableau to help make sense of the benchmark runs.

  • Indexing Strategy
    • 1 month data per shard, 1 week data per shard, 1 day data per shard
  • # of shards to use
  • Benchmark different forms of the query (constant score, filter with match all etc.)
  • User’s Transaction Volume
    • Test with accounts having 10 / 100 / 1000 / 10000 / 1000000 transactions per day
  • Query time range
    • 1 / 7 / 15 / 30 days
  • Store source documents in Elastic? Or store source in a different database and fetch only the matching IDs from Elastic?

Establishing a Baseline

The next step was to establish a baseline. We chose to start with a single node with one shard. We loaded a month’s worth of data (2 TB).

Tests showed we could search and get back 500 records from across 15 days in about 10 seconds when using just one node. This was good news since it could only get better from here. It also proves an Elastic (Lucene segments) shard can handle 2 billion documents indexed into it, more than what we’ll end up using.

One take away was high segment count increases response time significantly. This might not be so obvious when querying across multiple shards but is very obvious when there’s only one. Use force merge if you have the option (offline index builds). Using a high enough value for refresh_interval and translog.flush_threshold_size enables Elastic to collect enough data into a segment before a commit. The flip side was it increased the latency for the data to become available in the search results (we used 4GB & 180 seconds for this use case). We could clock over 70,000 writes per second from just one client node.

Nevertheless, data from the recent past is usually hot and we want all the nodes to chip in when servicing those requests. So next, we shard.

Sharding the Data

The same one month’s data (2 TB) was loaded onto 5 nodes with no replicas. Each Elastic node had one shard on it. We choose 5 nodes to have a few unused nodes. They would come in handy in case the cluster started to falter and needed additional capacity, and to test recovery scenarios. Meanwhile, the free nodes were used to load data into Elastic and acted as jMeter slave nodes.

With 5 nodes in play,

  • Response time dropped to 6 seconds (40% gain) for a query that scanned 15 days
  • Filtered queries were the most consistent performers
  • As a query scanned more records due to an increase in the date range, the response time also grew with it linearly
  • A force merge to 20 segments resulted in a response time of 2.5 seconds. This showed a good amount of time was being spent in processing results from individual segments, which numbered over 300 in this case. While tuning the segment merge process is largely automatic starting with Elastic 2.0, we can influence the segment count. This is done using the translog settings discussed before. Also, remember we can’t run a force merge on a live index taking reads or writes, since it can saturate available disk IO
  • Be sure to set the “throttle.max_bytes_per_sec” param to 100 MB or more if you’re using SSDs, the default is too low
  • Having the source documents stored in Elastic did not affect the response time by much, maybe 20ms. It’s surely more performant than having them off cluster on say Couchbase or Oracle. This is due to Lucene storing the source in a separate data structure that’s optimized for Elastic’s scatter gather query format and is memory mapped (see fdx and fdt files section under Lucene’s documentation). Having SSDs helped, of course

 

Sharding the Data

Final Setup

The final configuration we used had 5-9 shards per index depending on the age of the data. Each index held a week’s worth of data. This got us a reasonable shard count across the cluster but is something we will continue to monitor and tweak, as the system grows.

We saw response times around the 200 ms mark to retrieve 500 records after scanning 15 days’ worth of data with this setup. The cluster had 6 months of data loaded into it at this point.

Shard counts impact not just read times; they impact JVM heap usage due to Lucene segment metadata as well as recovery time, in case of node failure or a network partition. We’ve found it helps Elastic during rebalance if there are a number of smaller shards rather than a few large ones.

We also plan to spin up multiple instances per node to better utilize the hardware. Don’t forget to look at your kernel IO scheduler (see hardware recommendations) and the impact of NUMA and zone reclaim mode on Linux.

Final Setup

Conclusion

Elastic is a feature rich platform to build search and data intensive solutions. It removes the need to design for specific use cases, the way some NoSQL databases require. That’s a big win for us as it enables teams to iterate on solutions faster than would’ve been possible otherwise.

As long as we exercise due care when setting up the system and validate our assumptions on index design, Elastic proves to be a reliable workhorse to build data platforms on.

Implementing a Fast and Light Weight Geo Lookup Service

By

Fast Geo lookup of city and zip code for given latitude/longitude

Problem description

Geo lookup is a commonly required feature used by many websites and applications. Most smartphone applications can send latitude and longitude to server applications. Then the server applications use the latitude and longitude to perform Geo lookup.

Geo lookup falls into two categories:

  1. For a given latitude and longitude, retrieve full postal address including street, city, zip code.
  2. For a given latitude and longitude, retrieve nearest city with zip code.

Overwhelming majority of websites and applications require city and zip code only. The scope of this software is for server applications requiring to extract the nearest city with zip code for a given latitude and longitude.

This document describes a light weight, in-memory, fast lookup of zip codes for a given latitude/ longitude. This blog explains in detail how the inherent nature of zip code data source can be exploited to generate the cache through pre-processing. And the generated cache can be served from in-memory system to accomplish a very fast Geo lookup. Developers and product managers can implement it using the technique described in this post.

Illustration and reference implementation is provided for the US only. However it can be extended internationally.

High level Geo Lookup process

Prerequisite

Data source has a record per zip code. Each record has the following elements: zip code and its corresponding state, city, state, latitude and longitude.

Use case

  • Client apps such as smart phone apps send user’s latitude and longitude.
  • Web service reads latitude and longitude used. Web service Geo Lookup implementation retrieves the nearest zip code for the user’s latitude/ longitude.

This article describes how to implement and set up a fast Geo look up service using neither paid data source nor paid software.

Third party data and software providers

There are many providers who do sell data and software. Software can be either a library or a hosted solution. However there are some free data sources. Check out a few data resources here:

Sample records from csv

  • US|94022|Los Altos|California|CA|Santa Clara|085|37.3814|-122.1258|
  • US|94023|Los Altos|California|CA|Santa Clara|085|37.1894|-121.7053|
  • US|94024|Los Altos|California|CA|Santa Clara|085|37.3547|-122.0862|
  • US|94035|Mountain View|California|CA|Santa Clara|085|37.1894|-121.7053|
  • US|94039|Mountain View|California|CA|Santa Clara|085|37.1894|-121.7053|
  • US|94040|Mountain View|California|CA|Santa Clara|085|37.3855|-122.088|

Geo look up algorithms

All these data sources typically have around 49,000 records for the US and is available in CSV format. Each record contains latitude, longitude, state, city and zip code. There will be one record per zip code.

The Geo lookup process typically involves the following steps:

  1. User request is received with latitude and longitude by the web service or application.
  2. Query the data source and retrieve the nearest zip code calculated using nearest Geo distance algorithm.

Nearest point calculation involves selecting a set of points (around x Km radius) from the CityZipLatLong database around the given latitude/longitude and determine the point with minimum distance among the distances from each selected point. For the US, assuming 20 km radius is an optimal choice and assuming 20+ points selected for minimum distance calculation, it might take 50+ milliseconds.

The geographical distance between two latitude/ longitude is explained here: http://en.wikipedia.org/wiki/Geographical_distance. You can implement this algorithm or use open source software that supports Geo point data. Elastic search server is one such open source server that can be used for this purpose.

  1. You can implement this algorithm or use open source software to retrieve the nearest point.
  2. Ingest the zip code data into SQL database. Implement SQL client to retrieve the nearest point.
  3. Ingest data into elastic search server. Use Geo point query to retrieve the nearest point.

Typical industry implementations

Typical implementations use either paid or in house software. These implementations use on-demand caching. The caching will be done for each latitude/ longitude. However, on-demand caching does have a few disadvantages:

  • Requires use of services to retrieve nearest point for requests with new latitude/ longitude.
  • Requires provisioning software (service) for new points and make on-demand request. Need to maintain this service and software.
  • Cache size may end up being huge since latitude/ longitude values have 4-digit precision. For example, number of points in US will be 250,000 (valid latitudes between 24.0000 and 49.9999) times 500,000 (valid longitude between -122.0000 and -70.0000) equaling 125 billion.

Alternative perspective of data and requirement

The goal of this approach is to enable a simple and ultra fast Geo lookup service. This approach does not use Geo distance calculation services during user request at run time without affecting the the accuracy of nearest point calculation. This implies that we need to create cache through pre-processing. In addition, cache size should be smaller.

In effect, the implementation should involve the following steps:

  1. One time pre-processing: Generate the Geo lookup cache for all possible latitude and longitude. The cache size should not less than few GB for world.
  2. In memory lookup during user request.

Data and requirement analysis

  • For the US, we have 49,000 records (one per zip code).
  • Total US area is 9.8 million square kms. Typical 95 percentile distance between any two adjacent zip codes is more than 8 kms.
  • Latitude for the US ranges from 24 to 50 while longitude ranges from -124 to -65.
  • A 0.01 difference in latitude is around 1 km and a 0.01 difference in longitude is around 1.5 km.
    Given this, the percentage of points among all possible latitude/ longitude in the US that may spill to an adjacent zip code could be less than 0.5 percent. Given the usage context, using two decimal precisions should be acceptable for most websites and applications.
  • With two decimal precisions being good enough, we can determine the worst-case count of latitude/ longitude values for US contiguous states.
  • Two decimal precisions result in 15 million (2600 times 6000) possible latitude/ longitude values. Nearest zip code can be calculated for 15 million latitude/ longitude through one time pre-processing.

Cache Data storage and optimization

The process of generating the cache involves the following:

  • For each possible 2-decimal precision and radius boundary, request for nearest zip code. Given the vast area of the US, with a 20 km radius, only 7 million latitude/ longitude have nearest city/zip code.
  • Radius boundary can start from 10 km and if nearest zip code is not found, then we can progressively increase radius (say 20 km, 30 km, 40 km) until the nearest zip code is found. Using this approach, we can map all possible latitude/ longitude in the US with nearest zip code. (See Fig. 1 for illustration.) For places like New York city, 10 km is enough to get the nearest zip code. For many locations in Nevada state, 40 km radius will be required to get nearest point.
  • With only 49,000 points available in the US data source, an average of 250 latitude/ longitude points in cache will have the same zip code. When you extend this cache for all countries in the world, it would seem necessary to reduce cache size.
  • One technique will be storing one record in cache instead of 10 records for 10 successive latitude or longitude points with the same nearest zip code. For example for a given latitude and longitude values from -89.16 till -89.24, if the nearest zip codes are same, then we need to store only latitude and -89.20. In the fast lookup implementation, for a user request with say -89.19, both values for both -89.19 and -89.20 will be fetched. If -89.19 is not found in cache, value for -89.20 will be used.
    • Example illustration
      • Data set 1: Calculated nearest zip code for given latitude (37.12) and different longitudes (-89.16….-89.24) = 12346
      • Data set 2: Calculated nearest zip code for given latitudes (42.12) and different longitudes (-67.46….-67.54) = 52318
      • Instead of storing 10 keyValue entries for each of the above data set, we can store just one keyValue entry per data set. These will be
        • <(37.12:-89.20), 12346> and <(42.12:-67.50), 52318>
        • Good example for these data set 1 and 2 will be locations in Nevada, Arizona, etc. (See territory map below in Fig 1.)
  • Instead of storing latitude/ longitude as float, use 100 times the value and store them as integer. As an additional optimization latitude/ longitude can be stored in a single 32 bit integer with latitude as 16 bit MSB and longitude as 16 bit LSB.

Image Map of United States

Figure 1 : US territory map from fema.gov

Pseudo code

Pseudo code for cache generation

  • For each country/region determine the latitude/longitude boundary. Ex: the US boundaries are latitude range 24 to 49 and -122 to -69.
    • For each 2-decimal precision latitude/ longitude
      • Set radius = 10Km
      • While (nearest zip code not found AND radius < 50 km)
        • Find nearest zip code
        • If (nearest zip code) found
          • Add latitude/ longitude, zip code data as (K,V) in look up cache.
          • break
        • If not found, set radius = radius + 10

In above steps, aggregation of latitude/ longitude with same zip codes values can be performed and resulting in reduced cache entries. Format of the cache entry record. There will be two fields as (k,v) pair. Latitude and longitude will be stored as key. For storage optimization,  latitude and longitude together will be stored in 32 bit integer with latitude in 16 bit MSB and longitude in 16 bit LSB. Both latitude and longitude will be stored with 100 times its values as integer. Since 16 bit integer can hold -32768 to +32768, full range latitude and as well as longitude values (-18000 to +18000) can be stored. The value v in (k,v) holds the seq Id for zip data.

Pseudo code for zip look up for user request

This is typically executed in a web service.

On server start up

Use in-memory kv map to load the lookup cache. Alternate options could be services like CouchBase memcached.

User request processing

  1. Receive user request with latitude/ longitude
  2. Convert latitude/ longitude to 2-decimal precision’s and convert them to 100 times their values.
    1. Form the key in 32 bit: (latitude << 16) | longitude
    2. Note: Latitude is stored in 16 bit MSBB and longitude in 16 bit LSB. Shift latitude left 16 times and bit OR with longitude.
  3. Look up in GeoLookupCache to retrieve the zip code.

Reference implementation for Fast Geo lookup for the US

Currently FastGeoLookup is implemented only for the US. This is preliminary implementation. Provided only as reference implementation and is not tested.

https://github.com/Vish-Ram/GeoLookup

References

Data sources

  1. GeoNames.org: Refer to compliance requirements if data from this web site is used.
  2. OpenGeocode: Refer to compliance requirements if data from this web site is used.
  3. http://en.wikipedia.org/wiki/Geographical_distance.
  4. US territory map from fema.gov.: https://www.fema.gov/status-map-change-requests/status-map-change-requests/status-map-change-requests/status-map-change