Hinweis:
Dieser Blogartikel ist älter als 5 Jahre – die genannten Inhalte sind eventuell überholt.
Im Rahmen eines Research-Projektes wurde die Implementierung und Praxistauglichkeit von HyperLogLog auf Apache Spark Streaming mithilfe eines einfachen Prototyps untersucht. Dieser Artikel beschreibt Grundlagen, Durchführung und Findings.
Motivation
Ein häufiger Anwendungsfall in der Datenanalyse ist die Bestimmung der Kardinalität einer Menge, also die Anzahl an einzigartigen Elementen innerhalb der Menge. Man bezeichnet dies auch als das „Count-Distinct-Problem“. Eine in der Web-Analyse verwendete Standardkennzahl ist beispielsweise die Anzahl an eindeutigen Besucher:innen eines Webangebotes. Eine triviale Implementierung dieser Funktionalität wäre beispielsweise das Hinzufügen eines Elements zu einer Datenstruktur, die möglichst effizient die Operationen add() und contains() umsetzt (zum Beispiel HashSet). Die Menge an benötigtem Arbeitsspeicher verhält sich zur untersuchten Kardinalität in diesem Fall proportional [1]. Im Umfeld von Big Data, wo die von Doug Laney geprägten Begriffe Volume, Velocity und Variety [2] als wichtige Eigenschaften gelten, ist dieser Speicherbedarf nicht tragbar. Des Weiteren ist die Analyse solcher KPIs immer häufiger möglichst zeitnah – am besten in „Echtzeit“ – erwünscht, um auch kurzfristig auf bestimmte Situationen entsprechend reagieren zu können.
Aus diesen Gründen wird häufig auf exakte Ergebnisse verzichtet und auf randomisierte Näherungsverfahren gesetzt. Dieser Bereich stellt ein großes Forschungsgebiet dar, in dem bereits in den 1980er-Jahren entsprechende Lösungen des „Count-Distinct-Problems“ entwickelt wurden. Die Auswahl an Algorithmen reicht von einfachen und intuitiven Verfahren bis hin zu mathematisch komplexen Berechnungen. Einer dieser randomisierten Näherungsverfahren ist das offensichtlich weit verbreitete HyperLogLog. Um dem „Echtzeit“-Kriterium gerecht zu werden, gibt es im Zoo an Technologien verschiedene Streaming-Werkzeuge, welche die effiziente Analyse eines Datenstroms ermöglichen sollen. Eine der am häufigsten genannten Technologien in diesem Zusammenhang ist die Streaming-Bibliothek von Apache Spark.
LogLog und HyperLogLog
Der LogLog-Algorithmus [3] basiert im Kern auf einem statistischen Zählverfahren (Probabilistic Counting) durch Bitpattern-Observation. Hierzu wird zunächst jeder eingehende Wert durch eine Hashfunktion auf eine binäre String-Repräsentation abgebildet. Es kann hierbei intuitiv die Annahme getroffen werden, dass die Hälfte aller Hashwerte mit 1, ein Viertel aller Werte mit 01 und ein Achtel aller Werte mit 001, usw. beginnen. Die Anzahl an führenden Nullen innerhalb eines jeden Hashwertes ergeben eine Rangfolge, welche in einer Bitmap wiedergespiegelt wird. Das Prinzip soll an einem kleinen Beispiel verdeutlicht werden.
Abbildung 1 zeigt die zentral gepflegte Bitmap, in der jeweils die maximale Folge von führenden Nullen anhand eines gesetzten Bits am jeweiligen Index vermerkt wird.
Durch Anwendung von Stochastic Averaging kann der Standardfehler minimiert werden. Es werden m unabhängige Bitpattern-Observations durchgeführt und am Ende der statistische Durchschnitt über alle einzelnen Zwischenergebnisse gebildet, wobei m einen von außen festzulegender Parameter darstellt. Hierzu werden die Hashwerte anhand der ersten p Bits in m = 2p unabhängige Bitmaps, die sogenannten Buckets, eingeteilt. Die verbleibenden Bits werden zur eigentlichen Analyse verwendet. Abbildung 2 zeigt acht unabhängige Buckets, für die jeweils der oben beschriebene Prozess des Probabilistic Counting durchgeführt wird.
HyperLogLog arbeitet im Grunde genau wie sein Vorgänger mit dem zuvor beschriebenen Bit-Pattern-Observable. Allerdings werden hier unter anderem eine andere mathematische Berechnung zugrundegelegt und Korrekturverfahren für sehr kleine oder sehr große Kardinalitäten hinzugefügt. Zur Parametrisierung und somit zur Einstellung des Algorithmus ist lediglich der Parameter p von außen anzugeben, welcher letztendlich über die Genauigkeit und den Speicherplatzbedarf entscheidet.
Prototyp und Architektur
Es soll ein Datenstrom, der die Seitenaufrufe einer beobachteten Webseite enthält, in Echtzeit exemplarisch auf distinkte Benutzer untersucht werden. Dies kann zum Beispiel hilfreich sein für sehr kurzfristige Entscheidungen bei großem Request-Aufkommen. Hierzu wird ein Datenstrom simuliert, welcher inhaltlich an Log-Daten eines Webservers erinnert, wobei die Testdaten in stark vereinfachter Form vorliegen. Das Ergebnis soll die in einem bestimmten Zeitfenster kontinuierlich aktualisierte Kennzahl der distinkten Besucher:innen sein.
Die aufkommenden Events werden zunächst durch ein Kafka-Messagingsystem gepuffert. Der Empfänger (in unserem Beispiel Spark Streaming) erhält somit einen gleichmäßigen, homogenen Datenstrom. Als HyperLogLog-Implementierung wurde die freie Java-Bibliothek stream-lib verwendet. Diese bietet neben HyperLogLog noch weitere Streaming-Algorithmen.
In Abbildung 3 wird die vorbereitende Transformationskette schematisch dargestellt. Zunächst werden die als String ankommenden kommagetrennten Zeilen am Komma gesplittet und es wird ein neuer Stream erzeugt, welcher jeweils nur das erste Feld ( fullvisitorid) beinhaltet. Auf Basis dieses Streams wird ein Key Value-Stream erzeugt, welcher jedem Element den Schlüssel „key“ zuweist. Dies ist ein notwendiger Schritt, um später einen globalen Zustand über die gesamte Laufzeit mithilfe von updateStateByKey() pflegen zu können.
Listing 1 zeigt die entsprechende Update-Funktion, welche bei jedem Batch-Intervall aufgerufen wird und das HyperLogLog-Objekt vorlaufend aktualisiert.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
Function2<List<String>, Optional<HyperLogLogPlus>, Optional<HyperLogLogPlus>> hllUpdateFunction = new Function2<List<String>, Optional<HyperLogLogPlus>, Optional<HyperLogLogPlus>>() { public Optional<HyperLogLogPlus> call(List<String> values, Optional<HyperLogLogPlus> state) throws Exception { HyperLogLogPlus hll; if (state.isPresent()) { hll = state.get(); values.stream().forEach(value -> hll.offer(value)); } else { hll = new HyperLogLogPlus(p); } return Optional.of(hll); } }; |
Der Aufruf dieser Funktion und die Erzeugung eines Ergebnis-Streams lassen sich, wie Listing 2 zeigt, mithilfe von updateStateByKey() und einem anschließenden map() auf die Kennzahl, umsetzen.
1 2 3 4 5 |
JavaDStream<Long> hllCounts = fullvisitorIds .updateStateByKey(hllUpdateFunction) .map(pair -> pair._2().cardinality()); |
Fazit
HyperLogLog ist durch bereits existierende Bibliotheken sehr einfach in eine Anwendung zu integrieren. Die Anpassung an die jeweiligen Use Cases ist durch lediglich einen grundlegenden Parameter ebenfalls übersichtlich. In exemplarischen Testmessungen wurden Fehler von 1 – 2 % bei einem sublinearen Speicherplatzbedarf von etwa 100 MB für 10.000.000 Elemente erreicht (inklusive Spark-Laufzeitumgebung und sonstigem Overhead durch die verwendete Bibliothek). Eine testweise Implementierung einer naiven Lösung mithilfe eines Java-HashSets wurde hingegen nach Erreichen des Heap-Limits von 4 GB abgebrochen.
Durch die Echtzeit-Verarbeitung der Daten sind schließlich viele interessante Use Cases denkbar. Darunter z. B. die Indexierung der Ergebnisse in Elasticsearch und die Darstellung der Kennzahl als Timeline durch Kibana.
Literatur
- [1] Flajolet P.; Fusy É.; Gandouet O. et al. (2007): HyperLogLog: The Analysis Of A Near-Optimal Cardinality Estimation Algorithm. DMTCS Proceedings.
- [2] Laney D. (2001): 3D Data Management: Controlling Data Volume, Velocity and Variety.
- [3] Flajolet P. und Durand M. (2003): Loglog counting of large cardinalities. Algorithms-ESA.
We’re hiring!
Tapetenwechsel gefällig? Wir sind auf der Suche nach begeisterten Big Data Engineers, die unsere Projektteams im Umfeld von Hadoop (Hortonworks), Apache Flink, Spark und Cloudera unterstützen und den ELK-Stack produktiv beherrschen. Jetzt Bewerben!
Weiterlesen
Weitere Informationen zu unserem Analytics-Portfolio, von BI über Big Data, Search und Data Science & Deep Learning, gibt es auf unserer Website, per Mail an info@inovex.de oder telefonisch unter +49 721 619 021-0.