Sammeln von NetScaler LOGs in Elasticsearch und Anzeigen in Grafana

Heute haben wir eine sehr interessante Speisekarte, und wenn Sie für einen NetScaler in Ihrer Infrastruktur verantwortlich sind, In diesem Beitrag werden wir sehen, wie wir ihre Protokolle sammeln und dann verarbeiten und schließlich ein Dashboard in Grafana erstellen können, das in Echtzeit angezeigt wird (Das historische) die gesammelten Daten.

 

Also sagte ich:, Wir können unseren Citrix NetScaler veranlassen, seine Protokolle an Logstash zu senden, damit er sie behandelt und in strukturierte Daten trennt, die Zeilen verschiedener Protokolle sind. Wir speichern diese Informationen in einem Elasticsearch-Index und dann mit einem Konnektor bzw ‚Datenquelle‘ von Grafana können wir auf diese Daten zugreifen, um sie in Echtzeit oder in einem bestimmten Zeitraum anzuzeigen (letzte 24h, meine…). Und jeder kann das Dashboard basierend auf der Rolle erstellen, die sein NS spielt, wenn Sie haben, dass der Gateway-Dienst die öffentlichen IPs sieht, die darauf zugreifen, der Verkehr… Wenn wir zusätzlich zu den Zielen ein Content Switching-Idem haben… Wenn wir ein Load Balancing haben, dann analysieren Sie wirklich, wohin die Verbindungen gehen…  am Ende kann alles, was in einem Protokoll gespeichert ist, ausgenutzt werden und NetScaler ist ein sehr interessantes Gerät, um zu wissen, was darin passiert, wie die Visualisierung der Authentifizierungen, richtig oder falsch.

 

Zuerst müssen wir natürlich das Elastic Stack-Teil installieren, was war Logstash, Elasticsearch und Kibana; dann wäre es, unseren NetScaler anzuweisen, die Logs an Logstash zu senden, zum Hafen, den wir wollen, wir werden einen erfinden (von „System“ > „Auditierung“ > „Syslog-Überwachung“. Denken Sie daran, die Konfiguration im NetScaler zu speichern.

 

Als nächstes erstellen Sie eine Datei in Logstash, die Sie aufnehmen müssen, transformieren und senden Sie die Daten. Hören Sie zuerst auf dem im vorherigen Schritt angegebenen UDP-Port; zweite, Behandeln Sie die erhaltenen Protokolle und trennen Sie sie in verschiedene Felder, um sie schließlich in unserer Elasticsearch zu speichern.

 

Eingang {
        udp {
                Typ => "Netscaler"
                port => "1517"
                Tags => ["Netscaler"]
        }
}

Filter {

        ob [Art] == "Netscaler" {
                        grok {
                            Übereinstimmung => { "Botschaft" => [
                              "^<%{POSINT:syslog_pri}>%{PLATZ}%{DATE_US:log_close}:%{ZEIT:Protokollzeit}%{PLATZ}%{SYSLOGHOST:NSlog_Hostname}%{PLATZ}0-PSA-0%{PLATZ}:%{PLATZ}Ursprünglich%{PLATZ}%{WORT:Log_type}%{PLATZ}%{WORT:Log_event}%{PLATZ}%{INT:Log_id}%{PLATZ}0%{PLATZ}:%{PLATZ}Kontext%{PLATZ}%{NUTZERNAME:Benutzer}@%{IPV4:IP_urigen}%{PLATZ}-%{PLATZ}Session-ID:%{PLATZ}%{INT:Session-ID}%{PLATZ}-%{PLATZ}Benutzer%{PLATZ}%{NUTZERNAME:Benutzer2}%{PLATZ}-%{PLATZ}Client_IP%{PLATZ}%{IPV4:IP_origen2}%{PLATZ}-%{PLATZ}Nat_ip%{PLATZ}\"Zugeordnete IP"%{PLATZ}-%{PLATZ}Vserver%{PLATZ}%{IPV4:vServer_ip}:%{NUMMER:vServer_puerto}%{PLATZ}-%{PLATZ}Startzeit%{PLATZ}\"%{DATE_US:Startdatum}:%{ZEIT:Startzeit}%{PLATZ}\"%{PLATZ}-%{PLATZ}Endzeit%{PLATZ}\"%{DATE_US:Endtermin}:%{ZEIT:Endzeit}%{PLATZ}\"%{PLATZ}-%{PLATZ}Dauer%{PLATZ}%{ZEIT:Dauer}%{PLATZ}-%{PLATZ}Http_resources_accessed%{PLATZ}%{NUMMER:Http_resources_accessed}%{PLATZ}-%{PLATZ}NonHttp_services_accessed%{PLATZ}%{NUMMER:NonHttp_services_accessed}%{PLATZ}-%{PLATZ}Total_TCP_connections%{PLATZ}%{NUMMER:Total_TCP_Verbindungen}%{PLATZ}-%{PLATZ}Total_UDP_flows%{PLATZ}%{NUMMER:otal_UDP_flows}%{PLATZ}-%{PLATZ}Total_policies_allowed%{PLATZ}%{NUMMER:Total_policies_allowed}%{PLATZ}-%{PLATZ}Total_policies_denied%{PLATZ}%{NUMMER:Total_policies_denied}%{PLATZ}-%{PLATZ}Total_bytes_send%{PLATZ}%{NUMMER:Total_bytes_send}%{PLATZ}-%{PLATZ}Total_bytes_recv%{PLATZ}%{NUMMER:Total_bytes_recv}%{PLATZ}-%{PLATZ}Total_compressedbytes_send%{PLATZ}%{NUMMER:Total_compressedbytes_send}%{PLATZ}-%{PLATZ}Total_compressedbytes_recv%{PLATZ}%{NUMMER:Total_compressedbytes_recv}%{PLATZ}-%{PLATZ}Compression_ratio_send%{PLATZ}%{NUMMER:Compression_ratio_send}\%%{PLATZ}-%{PLATZ}Compression_ratio_recv%{PLATZ}%{NUMMER:Compression_ratio_recv}\%%{PLATZ}-%{PLATZ}Abmeldemethode%{PLATZ}\"%{DATEN:Abmeldemethode}\"%{PLATZ}-%{PLATZ}Gruppe(s\)%{PLATZ}\"%{DATEN:Gruppe}\"",
                              "^<%{POSINT:syslog_pri}>%{PLATZ}%{DATE_US:log_close}:%{ZEIT:Protokollzeit}%{PLATZ}%{SYSLOGHOST:NSlog_Hostname}%{PLATZ}0-PSA-0%{PLATZ}:%{PLATZ}Ursprünglich%{PLATZ}%{WORT:Log_type}%{PLATZ}%{WORT:Log_event}%{PLATZ}%{INT:Log_id}%{PLATZ}0%{PLATZ}:%{PLATZ}Kontext%{PLATZ}%{NUTZERNAME:Benutzer}@%{IPV4:IP_urigen}%{PLATZ}-%{PLATZ}Session-ID:%{PLATZ}%{INT:Session-ID}%{PLATZ}-%{PLATZ}Benutzer%{PLATZ}%{NUTZERNAME:Benutzer2}%{PLATZ}-%{PLATZ}Client_IP%{PLATZ}%{IPV4:IP_origen2}%{PLATZ}-%{PLATZ}Nat_ip%{PLATZ}\"Zugeordnete IP"%{PLATZ}-%{PLATZ}Vserver%{PLATZ}%{IPV4:vServer_ip}:%{NUMMER:vServer_puerto}%{PLATZ}-%{PLATZ}%{GREEDYDATA:Nachricht}",
                              "^<%{POSINT:syslog_pri}>%{PLATZ}%{DATE_US:log_close}:%{ZEIT:Protokollzeit}%{PLATZ}%{SYSLOGHOST:NSlog_Hostname}%{PLATZ}0-PSA-0%{PLATZ}:%{PLATZ}Ursprünglich%{PLATZ}%{WORT:Log_type}%{PLATZ}%{WORT:Log_event}%{PLATZ}%{INT:Log_id}%{PLATZ}0%{PLATZ}:%{PLATZ}Benutzer%{PLATZ}%{NUTZERNAME:Benutzer}%{PLATZ}-%{PLATZ}Client_IP%{PLATZ}%{IPV4:IP_urigen}%{PLATZ}-%{PLATZ}%{GREEDYDATA:Nachricht}",
                              "^<%{POSINT:syslog_pri}>%{PLATZ}%{DATE_US:log_close}:%{ZEIT:Protokollzeit}%{PLATZ}%{SYSLOGHOST:NSlog_Hostname}%{PLATZ}0-PSA-0%{PLATZ}:%{PLATZ}Ursprünglich%{PLATZ}%{WORT:Log_type}%{PLATZ}%{WORT:Log_event}%{PLATZ}%{INT:Log_id}%{PLATZ}0%{PLATZ}:%{PLATZ}Quelle%{PLATZ}%{IPV4:IP_urigen}:%{NUMMER:Port_origen}%{PLATZ}-%{PLATZ}Vserver%{PLATZ}%{IPV4:vServer_ip}:%{NUMMER:vServer_puerto}%{PLATZ}-%{PLATZ}NatIP%{PLATZ}%{IPV4:NAT_ip}:%{NUMMER:NAT_puerto}%{PLATZ}-%{PLATZ}Ziel%{PLATZ}%{IPV4:IP_Ziel}:%{NUMMER:Zielhafen}%{PLATZ}-%{PLATZ}Delink %{PLATZ}Zeit%{PLATZ}%{DATE_US:Link_close}:%{ZEIT:Delink_time}%{PLATZ}-%{PLATZ}Total_bytes_send%{PLATZ}%{INT:Total_bytes_sent}%{PLATZ}-%{PLATZ}Total_bytes_recv%{PLATZ}%{INT:Total_bytes_received}",
                              "^<%{POSINT:syslog_pri}>%{PLATZ}%{DATE_US:log_close}:%{ZEIT:Protokollzeit}%{PLATZ}%{SYSLOGHOST:NSlog_Hostname}%{PLATZ}0-PSA-0%{PLATZ}:%{PLATZ}Ursprünglich%{PLATZ}%{WORT:Log_type}%{PLATZ}%{WORT:Log_event}%{PLATZ}%{INT:Log_id}%{PLATZ}0%{PLATZ}:%{PLATZ}Quelle%{PLATZ}%{IPV4:IP_urigen}:%{NUMMER:Port_origen}%{PLATZ}-%{PLATZ}Ziel%{PLATZ}%{IPV4:Ziel-IP}:%{NUMMER:Zielhafen}%{PLATZ}-%{PLATZ}Start%{PLATZ}Zeit%{PLATZ}%{DATE_US:Startdatum}:%{ZEIT:Startzeit}%{PLATZ}-%{PLATZ}Ende%{PLATZ}Zeit%{PLATZ}%{DATE_US:Endtermin}:%{ZEIT:Endzeit}%{PLATZ}-%{PLATZ}Total_bytes_send%{PLATZ}%{INT:Total_bytes_sent}%{PLATZ}-%{PLATZ}Total_bytes_recv%{PLATZ}%{INT:Total_bytes_received}",
                              "^<%{POSINT:syslog_pri}>%{PLATZ}%{DATE_US:log_close}:%{ZEIT:Protokollzeit}%{PLATZ}%{SYSLOGHOST:NSlog_Hostname}%{PLATZ}0-PSA-0%{PLATZ}:%{PLATZ}Ursprünglich%{PLATZ}%{WORT:Log_type}%{PLATZ}%{WORT:Log_event}%{PLATZ}%{INT:Log_id}%{PLATZ}0%{PLATZ}:%{PLATZ}Quelle%{PLATZ}%{IPV4:IP_urigen}:%{NUMMER:Port_origen}%{PLATZ}-%{PLATZ}Vserver%{PLATZ}%{IPV4:vServer_ip}:%{NUMMER:vServer_puerto}%{PLATZ}-%{PLATZ}NatIP%{PLATZ}%{IPV4:NAT_ip}:%{NUMMER:NAT_puerto}%{PLATZ}-%{PLATZ}Ziel%{PLATZ}%{IPV4:IP_Ziel}:%{NUMMER:Zielhafen}%{PLATZ}-%{PLATZ}Delink %{PLATZ}Zeit%{PLATZ}%{DATE_US:Link_close}:%{ZEIT:Delink_time}%{PLATZ}Total_bytes_send%{PLATZ}%{INT:Total_bytes_sent}%{PLATZ}-%{PLATZ}Total_bytes_recv%{PLATZ}%{INT:Total_bytes_received}",
                              "^<%{POSINT:syslog_pri}>%{PLATZ}%{DATE_US:log_close}:%{ZEIT:Protokollzeit}%{PLATZ}%{SYSLOGHOST:NSlog_Hostname}%{PLATZ}0-PSA-0%{PLATZ}:%{PLATZ}Ursprünglich%{PLATZ}%{WORT:Log_type}%{PLATZ}%{WORT:Log_event}%{PLATZ}%{INT:Log_id}%{PLATZ}0%{PLATZ}:%{PLATZ}SPCBId%{PLATZ}%{INT:SPCBId}%{PLATZ}-%{PLATZ}ClientIP%{PLATZ}%{IPV4:IP_urigen}%{PLATZ}-%{PLATZ}ClientPort%{PLATZ}%{NUMMER:Port_origen}%{PLATZ}-%{PLATZ}VserverServiceIP%{PLATZ}%{IPV4:vServer_ip}%{PLATZ}-%{PLATZ}VserverServicePort%{PLATZ}%{NUMMER:vServer_puerto}%{PLATZ}-%{PLATZ}ClientVersion%{PLATZ}%{DATEN:Client_Version}%{PLATZ}-%{PLATZ}CipherSuite%{PLATZ}\"%{DATEN:Cipher_suite}\"(%{PLATZ}-%{PLATZ}Sitzung%{PLATZ}Wiederverwendung%{PLATZ}-%{PLATZ}HandshakeZeit%{PLATZ}%{INT:Handshake_time}%{PLATZ}Frau)?",
                              "^<%{POSINT:syslog_pri}>%{PLATZ}%{DATE_US:log_close}:%{ZEIT:Protokollzeit}%{PLATZ}%{SYSLOGHOST:NSlog_Hostname}%{PLATZ}0-PSA-0%{PLATZ}:%{PLATZ}Ursprünglich%{PLATZ}%{WORT:Log_type}%{PLATZ}%{WORT:Log_event}%{PLATZ}%{INT:Log_id}%{PLATZ}0%{PLATZ}:%{PLATZ}SPCBId%{PLATZ}%{INT:SPCBId}%{PLATZ}-%{PLATZ}Ausstellername%{PLATZ}\"%{PLATZ}%{DATEN:Ausstellername}\"",
                              "^<%{POSINT:syslog_pri}>%{PLATZ}%{DATE_US:log_close}:%{ZEIT:Protokollzeit}%{PLATZ}%{SYSLOGHOST:NSlog_Hostname}%{PLATZ}0-PSA-0%{PLATZ}:%{PLATZ}Ursprünglich%{PLATZ}%{WORT:Log_type}%{PLATZ}%{WORT:Log_event}%{PLATZ}%{INT:Log_id}%{PLATZ}0%{PLATZ}:%{PLATZ}SPCBId%{PLATZ}%{INT:SPCBId}%{PLATZ}-%{PLATZ}Subjekt Name%{PLATZ}\"%{PLATZ}%{DATEN:Subjekt Name}\"",
                              "^<%{POSINT:syslog_pri}>%{PLATZ}%{DATE_US:log_close}:%{ZEIT:Protokollzeit}%{PLATZ}%{SYSLOGHOST:NSlog_Hostname}%{PLATZ}0-PSA-0%{PLATZ}:%{PLATZ}Ursprünglich%{PLATZ}%{WORT:Log_type}%{PLATZ}%{WORT:Log_event}%{PLATZ}%{INT:Log_id}%{PLATZ}0%{PLATZ}:%{PLATZ}Kontext%{PLATZ}%{NUTZERNAME:Benutzer}@%{IPV4:IP_urigen}%{PLATZ}-%{PLATZ}Session-ID:%{PLATZ}%{INT:Session-ID}%{PLATZ}-%{PLATZ}%{Hostname:FQDN}%{PLATZ}Benutzer%{PLATZ}%{NUTZERNAME:Benutzer2}%{PLATZ}:%{PLATZ}Gruppe(s\)%{PLATZ}%{DATEN:Gruppe}%{PLATZ}:%{PLATZ}Vserver%{PLATZ}%{IPV4:vServer_ip}:%{NUMMER:vServer_puerto}%{PLATZ}-%{PLATZ}%{DATE_US:Datum}:%{ZEIT:Zeit}%{PLATZ}:%{PLATZ}%{GREEDYDATA:Nachricht}",
                              "^<%{POSINT:syslog_pri}>%{PLATZ}%{DATE_US:log_close}:%{ZEIT:Protokollzeit}%{PLATZ}%{SYSLOGHOST:NSlog_Hostname}%{PLATZ}0-PSA-0%{PLATZ}:%{PLATZ}Ursprünglich%{PLATZ}%{WORT:Log_type}%{PLATZ}%{WORT:Log_event}%{PLATZ}%{INT:Log_id}%{PLATZ}0%{PLATZ}:%{PLATZ}%{GREEDYDATA:Nachricht}"
                                       ]
                            }
                        }

                        geoip {
                          Quelle => "IP_urigen"
                          Ziel => "geoip"
                          add_field => [ "[geoip][Koordinaten]", "%{[geoip][Längengrad]}" ]
                          add_field => [ "[geoip][Koordinaten]", "%{[geoip][Breite]}"  ]
                        }

                        mutieren {
                          konvertieren => [ "[geoip][Koordinaten]", "schweben" ]
                          konvertieren => [ "Total_bytes_sent", "ganze Zahl" ]
                          konvertieren => [ "Total_bytes_received", "ganze Zahl" ]
                        }
        }
}

Ausgabe {

        ob ([Art]=="Netscaler"){

                Elasticsearch {
                   Index => "Netscaler-%{+JJJJ.MM.tt}"
                   Gastgeber => "DIRECTION_IP_ELASTICSEARCH:9200"
                }

        }
}

Also erstellen wir zum Beispiel die folgende Konfigurationsdatei ‚/etc/logstash/conf.d/netscaler.conf‘, Ich lasse dir mein Beispiel hier unten. Meine Filter sind sehr einfach und zuerst von Logstash, aber sie funktionieren 😉 also bei interesse könnt ihr sie kopieren, Und wenn Sie sie verbessern, können Sie sie an mich weitergeben 🙂 Am Ende geben wir in der Ausgabe den Elasticsearch-Server an, wohin die Daten gesendet werden sollen und den Index, wo sie gespeichert werden; Denken Sie daran, wenn Sie eine Authentifizierung haben, geben Sie die Parameter von ein ‚Nutzername‘ & ‚Passwort‘ de el ausgabe. Die Wahrheit ist, dass ich jetzt den Code sehe und er ist ziemlich verbesserungsfähig, Ich habe mich mit den Leerzeichen gekratzt, an die ich mich erinnere… Ich hoffe ihr könnt mir verzeihen aber es funktioniert, zumindest mit den neuesten Versionen wie NetScaler 13.0.

 

Sobald die Konfigurationsdatei erstellt wurde, Denken Sie daran, den Logstash-Dienst neu zu starten, um ihn neu zu laden. Nach wie immer, Wir werden nach Kibana gehen und sobald die Daten eingegeben sind, können wir zu „Management“ > „Stapelverwaltung“ > „Kibana“ > „Index Patterns“ > „Erstellen Indexmuster“ um das Indexmuster zu erstellen, ich sagte:, wie gewöhnlich (in diesem Fall und ohne die Anführungszeichen) ‚netscaler-*‘ und wir werden die Daten bereits in Elasticsearch korrekt gespeichert haben. Jetzt konnten wir uns verbinden von „Entdecken“ zu unserem NetScaler-Index und visualisieren Sie, dass er Daten sammelt.

 

Und dann sind wir, nach dem Erstellen des Index in Kibana, jetzt sollten wir in unserem Grafana eine erstellen „Datenquelle“ das deutet auf unseren Elasticsearch- und NetScaler-Index hin. Dann lässt es schon die Fantasie fliegen, ein Dashboard mit verschiedenen Panels erstellen, mit verschiedenen Daten zur Anzeige, ein Weltkarte bei eingehenden Verbindungen, einer von stil Sankey um Quell-/Ziel-IPs und den gesendeten Datenverkehr zu sehen, im Spaltenformat, in zeichnen um bestimmte Daten von beispielsweise den richtigen Logins zu sehen, falsch, die Verbindungen…

Alles kann verwendet werden, damit Sie Ideen aufnehmen und verbessern können, damit kannst du sehen was in unserem passiert(s) Citrix NetScaler in Echtzeit, eine automatische Erfrischung jeden 10 Sekunden ist sehr beeindruckend, Es wird uns auch helfen, die Zusammenfassung eines Tages zu analysieren, oder wissen, wenn etwas Seltsames passiert… wie immer, Vielen Dank an alle und mehr an diejenigen, die diese Art von Inhalten in sozialen Netzwerken verschieben 😉

 

Hector Herrero
Letzte Artikel von Hector Herrero (Alle anzeigen)