Thursday 16 February 2017

Einfach Moving Average Java

Ein einfaches Moving Average Implementierung in Java Bei mehreren Gelegenheiten wollte ich einfache Metriken in meinem Java-Anwendungen, zum Beispiel die Anzahl der Treffer pro Stunde, oder Fehler während eines Zeitraums zu berechnen. Während der Berechnung einfacher Metriken ist nicht schrecklich schwierig, seine nur extra Arbeit und Id eher verbringen diese Zeit auf der Problem-Domain. Ich war überrascht, keine allgemein akzeptierten Lösungen für Metriken in Java zu finden. Ich fand Metrics, aber es schien ein wenig zu kompliziert und nicht gut dokumentiert - Alles, was ich wollte, war es, einen gleitenden Durchschnitt zu berechnen. Ich dachte über das Problem einiges mehr und entschied es nicht ein schwieriges Problem. Heres meine Lösung Dies funktioniert durch die Schaffung eines Arrays von Fenster-Update-Frequenz Größe, dann ein Thread setzt die Zählung auf den nächsten Index im Array auf die Update-Frequenz. Die Zählung für das Intervall ist einfach arrayi - arrayi1, das ist die jüngste Zählung minus der ältesten Zählung. Für ein 10-Minuten-Intervall ist die älteste Zählung (i1) genau 10 Minuten alt. Um einen gleitenden Durchschnitt zu unserem Code hinzuzufügen, benötigen Sie zunächst einen Zähler mit AtomicLong. Dieser Zähler sollte basierend auf den Ereignissen inkrementiert werden, die für das Berechnen interessant sind (z. B. POST-Anforderungen für einen REST-Dienst). Wir müssen die Implementierung mit Zugriff auf den Zähler bereitstellen und das wird durch die GetCount-Schnittstelle erreicht. Hier Ill erstellen einen gleitenden Durchschnitt mit einem 5-Minuten-Fenster, das jede Sekunde aktualisiert. Und um den aktuellen Durchschnitt zu erhalten, rufen wir einfach die getAverage-Methode auf: Ein wichtiges Implementierungs-Detail ist, wie die Array-Größe bestimmt wird: indem Sie das Fenster durch die Aktualisierungshäufigkeit teilen. So kann ein großes Fenster mit einer häufigen Aktualisierungshäufigkeit eine beträchtliche Menge an Speicher verbrauchen. In diesem Beispiel ist die Array-Größe vernünftig 300. Wenn wir jedoch einen 24-Stunden-gleitenden Durchschnitt mit einem Intervall von 1 Sekunde erstellt haben, wäre die Größe 86400 Eine vernünftigere Aktualisierungsfrequenz für einen Zeitraum von 24 Stunden kann alle 5 Minuten betragen (Arraygröße von 288 ). Eine weitere Überlegung der Auswahl der Fenster-und Update-Frequenz ist das Fenster muss durch die Frequenz teilbar. Zum Beispiel ist ein 2-minütiges Fenster mit einer 6-Sekunden-Aktualisierungsfrequenz ok, aber eine 7-Sekunden-Aktualisierungsfrequenz ist nicht vorhanden, da es nicht durch 120 teilbar ist. Eine IllegalArgumentException wird geworfen, wenn die Fenstermodul-Aktualisierungsfrequenz nicht Null ist. Diese Implementierung erfordert einen Thread pro gleitenden Durchschnitt, was nicht sehr effizient ist. Eine bessere Lösung wäre, einen Thread über viele Durchschnitte zu teilen. Aktualisieren. Ich habe den Code aktualisiert, um einen Thread hier zu teilen. Schließlich theres ein Anfangszustandproblem: wir dont haben Daten noch für das gesamte Fenster. Zum Beispiel, wenn Sie ein 5-Minuten-Fenster und nur 15 Sekunden Daten haben. Diese Implementierung gibt null zurück, bis wir 5 Minuten Daten haben. Ein anderer Ansatz ist, den Durchschnitt abzuschätzen. Angenommen, wir haben eine Zählung von 10 in 30 Sekunden, dann können wir den Durchschnitt als 40 in 2 Minuten abschätzen. Es besteht jedoch das Risiko eines signifikanten Fehlers, indem unvollständige Daten extrapoliert werden. Zum Beispiel, wenn wir einen Burst von 20 Hits in 2 Sekunden, wed würde schätzen 1200 pro 2 Minuten, die aller Wahrscheinlichkeit nach weit weg ist. Java Gleitender Durchschnitt Methode Wenn Sie für eine EMA, die für Streaming-Daten optimiert ist, sourced suchen Aus einer Datei oder Quoting-Dienst, die folgende Beispiel-Klasse wird Ihnen gut, im Gegensatz zu mit Brute-Force-Berechnungen. Dieser Ansatz ist besonders nützlich, wenn Sie Daten in Echtzeit verarbeiten. EMAs, ein Spezialfall gewichteter gleitender Mittelwerte, haben den Vorteil, dass die relative Gewichtung für jede nachfolgende Periode um einen konstanten Faktor f 2 (N1) abnimmt, wobei N die Anzahl der Perioden ist, über die die EMA angewendet werden soll. Die folgende Beispielklasse implementiert diesen iterativen Charakter von EMA und minimiert die rechnerischen Anforderungen gegenüber Brute-Eman-Eman-1, Kraft-Methoden oder Nachbearbeitungsverfahren. Private int numPeriods numPeriods factor 2.0 (numPeriods 1.0) Berechtigungen zurücksetzen, um EMA für den angegebenen Zeitraum zu generieren. Public void reset (int numPeriods) Gibt EMA für den Zeitraum zurück, der während des Konstruktors definiert wird. Wenn verarbeitete Perioden kleiner als der EMA-Bereich sind, wird Null zurückgegeben. Public double calculate (doppelter Preis) runningEMA factorprice (1-Faktor) runningEMA if (totalPeriods lt numPeriods) Von wo aus Sie die Preisdaten ausgeben und was Sie mit den EMA-Ergebnissen tun, liegt bei Ihnen. Wenn Sie beispielsweise die Preisdaten in einem Array haben und eine EMA in ein anderes Array berechnen möchten, funktioniert das folgende Snippet: doppelte Preise. (EXTReMe Tracker) ema (priceidx) Viel Glück und beste Wünsche für Ihr project. Cloudera Engineering Blog Einfache Moving Average, Sekundäre Sortierung und MapReduce (Teil 3) Dies ist das letzte Stück zu einer dreiteiligen Blog-Serie. Wenn Sie die vorherigen Teile zu dieser Serie sehen möchten, verwenden Sie bitte den folgenden Link: Bisher habe ich erklärt, wie die Verwendung von Excel und R als Analysetools, um die Simple Moving Average von einem kleinen Satz von Aktien Schlusskurse zu berechnen. In diesem letzten Stück zu den drei Teil Blog-Serie, werde ich in die Verwendung von MapReduce, um die Simple Moving Average unserer kleinen Beispiel-Datensatz zu finden. Dann werde ich Ihnen zeigen, wie mit dem gleichen Code, werden Sie in der Lage, die Simple Moving Average von jedem Schlusskurs Aktienkurs seit 1980. Down the Rabbit Hole mit Hadoop In den oben genannten Beispielen haben wir einen Blick auf die Berechnung der einfachen gleitenden Durchschnitt Einer relativ kleinen Datenmenge. Für viele Analysen, Excel und R sind sehr effektive Werkzeuge, aber da wir in Richtung Gigabyte, Terabyte und Petabyte Datenspeicher skaliert laufen wir in einige Probleme mit Daten Lokalität, Festplattengeschwindigkeiten und Verarbeitungsgeschwindigkeiten. Um diese Faktoren zu veranschaulichen, nehmen wir eine mythische Maschine, die eine einzelne Petabyte-Festplatte hatte, die heute ähnlich wie die Festplattengeschwindigkeiten funktionierte. Für die Zwecke dieses Beispiels verwenden Sie eine Lesegeschwindigkeit von 40 MBs. Sagen wir, dass unsere Aufgabe, durch diese Daten zu scannen und produzieren einen einfachen gleitenden Durchschnitt, der Prozessor nicht behindern die Berechnung, und wir können eine bewegte Fenster Berechnung durch die Daten an der vollen 40 MBs zu erhalten. Wir gehen auch davon aus, dass die Daten zuvor sortiert wurden und dass wir nur eine sequentielle Suche durchführen mussten, um so die Datendurchsatzrate von der Festplatte zu maximieren und 40 MBit / s an die Verarbeitungspipeline zu liefern. Basierend auf Jeff Deans 12 Numbers Jeder Ingenieur sollte wissen, dies ist ein plausibles Setup. Bei diesem Durchsatz würde unsere einfache gleitende Durchschnittsberechnung von 1 Petabyte Daten etwa 310 Tage dauern. Für die meisten Situationen, diese Betriebskosten, in Bezug auf die Zeit, macht es unangemessen zu berücksichtigen. Glücklicherweise reduzieren die Mechanismen von HDFS und MapReduce diese Faktoren, so dass wir dieses Problem eine lineare Zeit und Kapital-Funktion, um uns zu entscheiden, die Zahl der Maschinen, die wir implementieren wollen, um effizient unternehmen diese einfache gleitende durchschnittliche Scan. In dem obigen einfachen gleitenden Durchschnittsbeispiel vernachlässigten wir die Einschränkungen von: Speichern des Petabyte von Daten auf nicht-mythischer Hardware. Sortierung der Petabyte von Daten. Angesichts Hardware-Ausfall während der 310 Tage der Bearbeitungszeit. Typischerweise müssen Zeitreihen-Anwendungen die Daten an einem gewissen Punkt abtasten, was große Berge erzeugt, um zu klettern, wenn wir große Zonen von Zeitreihendaten in heutigen Systemen ansprechen möchten. Wurden Multi-Terabyte-und Multi-Petabyte-Datenquellen in der Zeitreihe Domain jeden Tag, einschließlich und in jeder dieser Domains die oben genannten Szenario ist eine sehr echte Herausforderung zu bewältigen. HDFS löst die oben genannten Speicher - und Fehlerprobleme, aber was ist mit den Sortier - und Verarbeitungsproblemen Die Sortierung großer Datenmengen an sich ist ein nicht-triviales Problem, ist aber mit ein paar Tricks in MapReduce zugänglich. Werfen wir einen Blick auf realen MapReduce-Code, den wir herunterladen können, zu kompilieren und produzieren unsere eigenen skalierbaren einfachen gleitenden Durchschnitt, um einige dieser Schmerzen zu lösen. Einfacher verschiebender Durchschnitt in MapReduce Typischerweise besteht eine MapReduce-Anwendung aus zwei Funktionen: (Sie ahnen es) eine Kartenfunktion und eine reduzierende Funktion. In der Welt der Java-Programmierung erstellen wir eine Map-Klasse und eine reduzieren Klasse, die jeweils mit erben Methoden für ihre respektvolle Zwecke nützlich. Wir verwenden das MapReduce-Programmiermodell, weil es gebaut wird, um Gleichzeitigkeitsprobleme in unseren Algorithmen zu verringern, und wir erhalten unsere skalierbare Parallelität relativ schmerzlos. Die Kartenfunktion kann Code beinhalten, der eine pro-Schlüssel-Wert-Paar-Operation ausführt, aber ihre logische Hauptoperation besteht darin, Daten durch Schlüssel zu gruppieren. Eine sehr einfache Möglichkeit, über eine Kartenfunktion nachzudenken, besteht darin, sie als logische Projektion der Daten oder einer group-by-Klausel zu betrachten. Die reduzierte Funktion wird verwendet, um diese Gruppen einzeln aufzunehmen und einen Prozess über die gruppierten Werte auszuführen. Gemeinsame Operationen in reduzierenden Funktionen umfassen: In unserem einfachen gleitenden Durchschnittsbeispiel arbeiten wir jedoch nicht auf einer pro Wertbasis, und wir erzeugen auch keine Aggregate über alle Werte hinweg. Unser Betrieb im aggregierten Sinne beinhaltet ein Schiebefenster, das seine Operationen auf einer Teilmenge der Daten bei jedem Schritt ausführt. Wir müssen auch berücksichtigen, daß die Punkte in unseren Zeitreihendaten nicht garantiert werden können, um die Reduktion zu erreichen, und müssen in den vorangegangenen Abschnitten sortiert werden. Dies liegt daran, dass bei mehreren Kartenfunktionen, die mehrere Abschnitte der Quellendaten lesen, keine Zuordnung von MapReduce zu den Schlüsselwertpaaren erfolgt, die in den Standard-Partitions - und Sortierungsschemata zusammengefasst sind. Es gibt das Szenario, wo wir partitionierte Daten sortiert haben, aber im Interesse dieses Beispiels würden wir mit den mehr Garten-Vielfalt unsortierten Zeitreihendaten umgehen. Machen wir einen ersten Durchlauf, wie wir diese MapReduce einfachen gleitenden durchschnittlichen Job entwerfen konnten. Wir möchten alle Werte einer Gruppe zusammenfassen, die eng miteinander zusammenhängen, so dass wir die einfache gleitende Durchschnittsoperation über die sortierten Zeitreihendaten anwenden können. Wir wollen jedes Zeitreihen-Schlüsselwertpaar auf ein Aktiensymbol ausgeben, um diese Werte zusammen zu gruppieren. In der reduzierten Phase können wir eine Operation, hier den einfachen gleitenden Durchschnitt, über die Daten ausführen. Da die Daten mehr als wahrscheinlich nicht an den Reduzierer in sortierter Reihenfolge ankommen, müssen wir die Daten sortieren, bevor wir den einfachen gleitenden Durchschnitt berechnen können. Ein häufiger Weg, um Daten zu sortieren ist, die Daten in den Speicher in einer Datenstruktur wie ein Heap laden, ähnlich wie dies in einem normalen Java-Programm durchgeführt wird. Verwenden Sie in diesem Fall die Javas-Prioritätswarteschlangenklasse, um unsere Daten zu sortieren. Wir müssen auch die Größe des Speichers berücksichtigen, die von den eingehenden Zeitreihendaten während der Sortierung verwendet wird, da dies ein einschränkender Faktor für die Menge der Daten ist, die wir sortieren können. In diesem Entwurf müssen wir alle Zeitreihendaten laden, bevor wir die Verarbeitung starten können und wenn die zu sortierende Datenmenge die verfügbare Heapgröße übersteigt, haben wir ein Problem. Ein Beispiel für diese Implementierung wird bei github gehostet: Um diesen Code auf Ihrem eigenen Hadoop-Cluster auszuführen, laden Sie CDH von Cloudera herunter und richten Sie einen pseudo-verteilten Cluster 8211 ein, der ein einzelner Knoten von Hadoop ist. Pseudo-verteilter Modus ist eine große Weise, Code mit Hadoop auszuprobieren. Nächsten Download und kompilieren Sie den gleitenden Durchschnitt Code in ein Glas. Um den Code direkt von github herunterzuladen (in der Shell im MacOSX, ssh-Terminalfenster in linux oder MINGW32 für win32), verwenden wir den Befehl: Unser erster Pass ist eine anständige Lösung, wurde aber von unserem Java Virtual Machine (JVM) - Kind begrenzt Heap-Größe, und wir nehmen uns Zeit, die Daten selbst manuell zu sortieren. Mit einigen Konstruktionsänderungen können wir diese beiden Probleme lösen, indem wir einige der inhärenten Eigenschaften von MapReduce nutzen. Zuerst wollen wir den Fall der Sortierung der Daten im Speicher auf jedem Reduzierer betrachten. Derzeit müssen wir sicherstellen, dass wir nie mehr Daten an ein einzelnes Reduzierstück senden, als im Speicher passen können. Die Art und Weise, die wir derzeit steuern können, ist, jedem Reduzierer Kind JVM mehr Heap andor zu geben, um unsere Zeitreihendaten in der Kartenphase weiter zu trennen. In diesem Fall wed Partition weiter durch die Zeit, brechen unsere Daten in kleinere Fenster der Zeit. Im Gegensatz zur weiteren Partitionierung der Daten ist ein anderer Ansatz für dieses Problem, Hadoop zu erlauben, die Daten für uns in der sogenannten Shuffle-Phase von MapReduce zu sortieren. Wenn die Daten an einem Reduzierer bereits in sortierter Reihenfolge ankommen, können wir unseren Speicherbedarf senken und die Anzahl von Schleifen durch die Daten reduzieren, indem wir nur die nächsten N Abtastwerte für jede einfache gleitende Durchschnittsberechnung ansehen. Dies bringt uns auf den entscheidenden Aspekt dieses Artikels, die so genannte shuffles sekundäre Art Mechaniker. Sortierung ist etwas, das wir Hadoop für uns lassen können und Hadoop hat sich als sehr gut bei der Sortierung großer Datenmengen bewährt und gewann den Grey Sort-Wettbewerb im Jahr 2008. Bei der Verwendung der sekundären Sortiermechaniker können wir sowohl unsere Heap-und sortieren Fragen ziemlich einfach lösen Und effizient. Um die sekundäre Sortierung in unserem Code zu verwenden, müssen wir den Schlüssel zu einem Komposit aus dem natürlichen Schlüssel und dem natürlichen Wert machen. Unten in Abbildung-1 sehen wir ein Diagramm, wie dies visuell aussehen würde. Abbildung 1: Zusammengesetztes Schlüsseldiagramm Der zusammengesetzte Schlüssel gibt Hadoop die benötigten Informationen während des Shuffles, um eine Sortierung nicht nur auf dem 8220stock symbol8221 durchzuführen, sondern auch auf dem Zeitstempel. Die Klasse, die diese Composite Keys sortiert, wird als Schlüsselkomparator oder hier 8220CompositeKeyComparator8221 bezeichnet. Der Schlüsselkomparator sollte durch die zusammengesetzte Taste, die die Kombination aus dem natürlichen Schlüssel und dem natürlichen Wert ist, bestellen. Wir sehen unten in Abbildung-2, wo eine abstrakte Version der sekundären Sortierung auf einem zusammengesetzten Schlüssel von 2 ganzen Zahlen durchgeführt wird. Abbildung-2: CompositeKeyComparator Sortierung Composite Keys (Schlüssel sind Ganzzahlen). In Abbildung-3 unten sehen wir ein realistischeres Beispiel, wo wir den Composite Key geändert haben, um eine Stock-Symbolkette (K1) und einen Zeitstempel (K2, angezeigt als ein Datum, aber in dem Code ist ein long in ms) zu haben. Das Diagramm hat die KV-Paare durch beide 8220K1 sortiert: Lager-Symbol8221 (natürliche Taste) und 8220K2: Zeitstempel8221 (Sekundärschlüssel). Abbildung 3: CompositeKeyComparator bei der Arbeit an unseren zusammengesetzten Tasten. Zusammengesetzte Taste wird nun mit einem String-Stock-Symbol (K1) und einem Datum (K2) dargestellt. Sobald wir unsere Daten auf dem zusammengesetzten Schlüssel sortiert haben, müssen wir nun die Daten für die reduzierte Phase partitionieren. In Abbildung 4 sehen wir, wie die Daten aus Abbildung 3 oben mit dem NaturalKeyPartitioner partitioniert wurden. Abbildung 4: Partitionierung durch den natürlichen Schlüssel mit dem NaturalKeyPartitioner. Nachdem wir unsere Daten partitioniert haben, können die Reduzierer nun mit dem Herunterladen der Partitionsdateien beginnen und ihre Zusammenführungsphase beginnen. In Abbildung 5 sehen wir, wie der Gruppierungskomparator oder NaturalKeyGroupingComparator verwendet wird, um sicherzustellen, dass ein reduzierter () Aufruf nur die logisch gruppierten Daten für diesen zusammengesetzten Schlüssel sieht. Abbildung 5: Gruppieren von Komparatoren, die Partitionsdateien zusammenführen. Der Partitionierer und Gruppierungsvergleicher für die zusammengesetzte Taste sollte nur den natürlichen Schlüssel für die Partitionierung und Gruppierung berücksichtigen. Unten ist eine kurze Beschreibung des Simple Moving Average-Codes, der geändert wird, um die sekundäre Sortierung zu verwenden, und wird auf Github gehostet. Wenn Sie bemerken, passen die Namen der Klassen eng mit der in den obigen Diagrammen verwendeten Terminologie und in Tom Whites Hadoop: The Definitive Guide (Kapitel 8 MapReduce Features) zusammen, um den Code leichter verständlich zu machen. NaturalKey 8211, was Sie normalerweise als Schlüssel oder Gruppe nach Operator verwenden würden. In diesem Fall ist der Natural Key das Gruppen - oder Aktiensymbol, da potenziell unsortierte Bestandsdaten gruppiert werden müssen, bevor wir sie sortieren und den einfachen gleitenden Durchschnitt berechnen können. Composite Key 8211 Ein Schlüssel, der eine Kombination aus dem natürlichen Schlüssel und dem natürlichen Wert ist, den wir sortieren möchten. 5 Antworten auf ldquo Einfache Moving Average, Sekundäre Sortierung und MapReduce (Teil 3) rdquo Cool Trick mit dem Split sorterpartitioner. Soweit ich kann sagen, diese Werke groß, bis die Serie extrem lange (denken 30 Jahre Tick-Level-Daten) 8211 scheint wie die Partitionierung von Zeit könnte sehr schwierig sein. Weißt du, was in hadoop wie ein 8220overlappende partitioner8221, die die gleichen Daten auf mehrere Partitionen spucken kann ich mit Mappern, die Werte über mehrere Schlüssel dupliziert haben gespuckt haben, aber ich frage mich, ob there8217s eine konventionellere Methode, dies zu tun. Evan, Sie sind tot mit der Größe der Daten in einem einzigen Keyspace. Ich habe das gleiche Problem bei der Arbeit an dem openPDC-Projekt für die NERC: Ein Sensor könnte buchstäblich Milliarden von Punkten in einer sehr kurzen Zeit, so für Prototyp-Aufträge haben wir die Dinge an einem einzigen Tag (3.600.000ms): In einem mehr Komplexe Version würde ich überlappende Zeitschlitze verwendet haben, damit der Mapper ausreichende Daten aus benachbarten Schlüsselräumen erhalten würde, um eine einzelne Fensterlänge zu bedecken. Für jetzt I8217d sagen, Sie sind auf dem richtigen Weg mit den doppelten Werten. Ich weiß, dies ist nicht im Zusammenhang mit gleitenden Durchschnitten, aber wie genau war die SAX-Zeitreihen-Matching in PDC Ich implementiert so etwas (mit Ausnahme der MapReduce API 2), und in der Schleife der reduzieren () - Funktion, wenn die. Next () - Methode auf dem Iterator aufgerufen wird, erhalten wir einen neuen Wert, aber der Schlüssel ändert sich auch auf wundersame Weise. Vielmehr ändert sich der Teil des zusammengesetzten Schlüssels, der nicht als ein natürlicher Schlüssel verwendet wurde (der Zeitstempel in diesem Beispiel). Das war ziemlich überraschend. Wie dies passiert Post Navigation Annahme Apache Hadoop in der Bundesregierung


No comments:

Post a Comment