Recopilando los LOGs de NetScaler en Elasticsearch y visualizándolos en Grafana
Hoy tenemos un menú bastante interesante, y si es que si tienes a tu cargo algún NetScaler en tu infraestructura, en este post veremos cómo podemos recopilar sus logs para después ser tratados y finalmente hacernos un dashboard en Grafana que muestre en tiempo real (o histórico) los datos recopilados.
Así que lo dicho, podemos hacer que nuestro Citrix NetScaler envíe sus logs a Logstash para que los trate y separe en datos estructurados lo que son líneas de distintos logs. Almacenaremos en un índice de Elasticsearch esta información y luego con un conector o ‘Data Source’ de Grafana podremos acceder a estos datos, para visualizarlos en tiempo real o en un periodo de tiempo determinado (últimas 24h, mes…). Y cada uno puede hacerse el dashboard en base al rol que haga su NS, si tiene el servicio Gateway viendo las IP’s públicas que acceden, tráficos… Si tenemos un Content Switching ídem además de destinos… Si tenemos un Load Balancing pues analizar realmente por donde van las conexiones… al final todo lo que se almacena en un log lo podemos explotar y NetScaler es un dispositivo muy interesante para conocer qué pasa dentro, como puedan ser la visualización de las autenticaciones, correctas o incorrectas.
Primero obviamente tenemos que tener la parte de Elastic Stack instalada, lo que viene siendo Logstash, Elasticsearch y Kibana; luego ya sería indicarle a nuestro NetScaler que envíe los Logs a Logstash, al puerto que nos de la gana, nos inventaremos uno (desde “System” > “Auditing” > “Syslog Auditing”. Recordar grabar la configuración en el NetScaler.
Lo siguiente será crear un fichero en Logstash que deberá de ingestar, transformar y enviar los datos. Primero escuchar en el puerto UDP indicado en el paso anterior; segundo, tratar los logs que recibe y separarlos en distintos campos para ser por último almacenados en nuestro Elasticsearch.
input { udp { type => "Netscaler" port => "1517" tags => ["Netscaler"] } } filter { if [type] == "Netscaler" { grok { match => { "message" => [ "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}Context%{SPACE}%{USERNAME:Usuario}@%{IPV4:IP_origen}%{SPACE}-%{SPACE}SessionId:%{SPACE}%{INT:Session_id}%{SPACE}-%{SPACE}User%{SPACE}%{USERNAME:Usuario2}%{SPACE}-%{SPACE}Client_ip%{SPACE}%{IPV4:IP_origen2}%{SPACE}-%{SPACE}Nat_ip%{SPACE}\"Mapped Ip\"%{SPACE}-%{SPACE}Vserver%{SPACE}%{IPV4:vServer_ip}:%{NUMBER:vServer_puerto}%{SPACE}-%{SPACE}Start_time%{SPACE}\"%{DATE_US:Inicio_fecha}:%{TIME:Inicio_hora}%{SPACE}\"%{SPACE}-%{SPACE}End_time%{SPACE}\"%{DATE_US:Fin_fecha}:%{TIME:Fin_hora}%{SPACE}\"%{SPACE}-%{SPACE}Duration%{SPACE}%{TIME:Duracion}%{SPACE}-%{SPACE}Http_resources_accessed%{SPACE}%{NUMBER:Http_resources_accessed}%{SPACE}-%{SPACE}NonHttp_services_accessed%{SPACE}%{NUMBER:NonHttp_services_accessed}%{SPACE}-%{SPACE}Total_TCP_connections%{SPACE}%{NUMBER:Total_TCP_connections}%{SPACE}-%{SPACE}Total_UDP_flows%{SPACE}%{NUMBER:otal_UDP_flows}%{SPACE}-%{SPACE}Total_policies_allowed%{SPACE}%{NUMBER:Total_policies_allowed}%{SPACE}-%{SPACE}Total_policies_denied%{SPACE}%{NUMBER:Total_policies_denied}%{SPACE}-%{SPACE}Total_bytes_send%{SPACE}%{NUMBER:Total_bytes_send}%{SPACE}-%{SPACE}Total_bytes_recv%{SPACE}%{NUMBER:Total_bytes_recv}%{SPACE}-%{SPACE}Total_compressedbytes_send%{SPACE}%{NUMBER:Total_compressedbytes_send}%{SPACE}-%{SPACE}Total_compressedbytes_recv%{SPACE}%{NUMBER:Total_compressedbytes_recv}%{SPACE}-%{SPACE}Compression_ratio_send%{SPACE}%{NUMBER:Compression_ratio_send}\%%{SPACE}-%{SPACE}Compression_ratio_recv%{SPACE}%{NUMBER:Compression_ratio_recv}\%%{SPACE}-%{SPACE}LogoutMethod%{SPACE}\"%{DATA:LogoutMethod}\"%{SPACE}-%{SPACE}Group\(s\)%{SPACE}\"%{DATA:Group}\"", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}Context%{SPACE}%{USERNAME:Usuario}@%{IPV4:IP_origen}%{SPACE}-%{SPACE}SessionId:%{SPACE}%{INT:Session_id}%{SPACE}-%{SPACE}User%{SPACE}%{USERNAME:Usuario2}%{SPACE}-%{SPACE}Client_ip%{SPACE}%{IPV4:IP_origen2}%{SPACE}-%{SPACE}Nat_ip%{SPACE}\"Mapped Ip\"%{SPACE}-%{SPACE}Vserver%{SPACE}%{IPV4:vServer_ip}:%{NUMBER:vServer_puerto}%{SPACE}-%{SPACE}%{GREEDYDATA:Mensaje}", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}User%{SPACE}%{USERNAME:Usuario}%{SPACE}-%{SPACE}Client_ip%{SPACE}%{IPV4:IP_origen}%{SPACE}-%{SPACE}%{GREEDYDATA:Mensaje}", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}Source%{SPACE}%{IPV4:IP_origen}:%{NUMBER:Puerto_origen}%{SPACE}-%{SPACE}Vserver%{SPACE}%{IPV4:vServer_ip}:%{NUMBER:vServer_puerto}%{SPACE}-%{SPACE}NatIP%{SPACE}%{IPV4:NAT_ip}:%{NUMBER:NAT_puerto}%{SPACE}-%{SPACE}Destination%{SPACE}%{IPV4:IP_destino}:%{NUMBER:Puerto_destino}%{SPACE}-%{SPACE}Delink%{SPACE}Time%{SPACE}%{DATE_US:Delink_fecha}:%{TIME:Delink_hora}%{SPACE}-%{SPACE}Total_bytes_send%{SPACE}%{INT:Total_bytes_enviados}%{SPACE}-%{SPACE}Total_bytes_recv%{SPACE}%{INT:Total_bytes_recibidos}", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}Source%{SPACE}%{IPV4:IP_origen}:%{NUMBER:Puerto_origen}%{SPACE}-%{SPACE}Destination%{SPACE}%{IPV4:Destino_ip}:%{NUMBER:Destino_puerto}%{SPACE}-%{SPACE}Start%{SPACE}Time%{SPACE}%{DATE_US:Inicio_fecha}:%{TIME:Inicio_hora}%{SPACE}-%{SPACE}End%{SPACE}Time%{SPACE}%{DATE_US:Fin_fecha}:%{TIME:Fin_hora}%{SPACE}-%{SPACE}Total_bytes_send%{SPACE}%{INT:Total_bytes_enviados}%{SPACE}-%{SPACE}Total_bytes_recv%{SPACE}%{INT:Total_bytes_recibidos}", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}Source%{SPACE}%{IPV4:IP_origen}:%{NUMBER:Puerto_origen}%{SPACE}-%{SPACE}Vserver%{SPACE}%{IPV4:vServer_ip}:%{NUMBER:vServer_puerto}%{SPACE}-%{SPACE}NatIP%{SPACE}%{IPV4:NAT_ip}:%{NUMBER:NAT_puerto}%{SPACE}-%{SPACE}Destination%{SPACE}%{IPV4:IP_destino}:%{NUMBER:Puerto_destino}%{SPACE}-%{SPACE}Delink%{SPACE}Time%{SPACE}%{DATE_US:Delink_fecha}:%{TIME:Delink_hora}%{SPACE}Total_bytes_send%{SPACE}%{INT:Total_bytes_enviados}%{SPACE}-%{SPACE}Total_bytes_recv%{SPACE}%{INT:Total_bytes_recibidos}", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}SPCBId%{SPACE}%{INT:SPCBId}%{SPACE}-%{SPACE}ClientIP%{SPACE}%{IPV4:IP_origen}%{SPACE}-%{SPACE}ClientPort%{SPACE}%{NUMBER:Puerto_origen}%{SPACE}-%{SPACE}VserverServiceIP%{SPACE}%{IPV4:vServer_ip}%{SPACE}-%{SPACE}VserverServicePort%{SPACE}%{NUMBER:vServer_puerto}%{SPACE}-%{SPACE}ClientVersion%{SPACE}%{DATA:Client_version}%{SPACE}-%{SPACE}CipherSuite%{SPACE}\"%{DATA:Cipher_suite}\"(%{SPACE}-%{SPACE}Session%{SPACE}Reuse%{SPACE}-%{SPACE}HandshakeTime%{SPACE}%{INT:Handshake_time}%{SPACE}ms)?", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}SPCBId%{SPACE}%{INT:SPCBId}%{SPACE}-%{SPACE}IssuerName%{SPACE}\"%{SPACE}%{DATA:Issuer_name}\"", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}SPCBId%{SPACE}%{INT:SPCBId}%{SPACE}-%{SPACE}SubjectName%{SPACE}\"%{SPACE}%{DATA:Subject_name}\"", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}Context%{SPACE}%{USERNAME:Usuario}@%{IPV4:IP_origen}%{SPACE}-%{SPACE}SessionId:%{SPACE}%{INT:Session_id}%{SPACE}-%{SPACE}%{HOSTNAME:FQDN}%{SPACE}User%{SPACE}%{USERNAME:Usuario2}%{SPACE}:%{SPACE}Group\(s\)%{SPACE}%{DATA:Grupo}%{SPACE}:%{SPACE}Vserver%{SPACE}%{IPV4:vServer_ip}:%{NUMBER:vServer_puerto}%{SPACE}-%{SPACE}%{DATE_US:Fecha}:%{TIME:Hora}%{SPACE}:%{SPACE}%{GREEDYDATA:Mensaje}", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}%{GREEDYDATA:Mensaje}" ] } } geoip { source => "IP_origen" target => "geoip" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float" ] convert => [ "Total_bytes_enviados", "integer" ] convert => [ "Total_bytes_recibidos", "integer" ] } } } output { if ([type]=="Netscaler"){ elasticsearch { index => "netscaler-%{+YYYY.MM.dd}" hosts => "DIRECCION_IP_ELASTICSEARCH:9200" } } }
Así que creamos por ejemplo el siguiente fichero de configuración ‘/etc/logstash/conf.d/netscaler.conf’, yo os dejo mi ejemplo aquí abajo. Mis filtros son muy básicos y de primero de Logstash, pero funcionan 😉 así que si te interesan puedes copiarlos, y si los mejoras me los pasas 🙂 Al final indicaremos en la salida el servidor de Elasticsearch donde mandar los datos y el índice donde se almacenará; recordar que si tenéis autenticación poner los parámetros de ‘username’ & ‘password’ en el output. La verdad que estoy viendo ahora el código y es bastante mejorable, me rayé con los espacios recuerdo… espero me podáis perdonar pero funciona, al menos con las últimas versiones como pueda ser NetScaler 13.0.
Una vez creado el fichero de configuración, recordar reiniciar el servicio de Logstash para recargar. Después como siempre, iremos a Kibana y una vez los datos estén entrando ya podremos ir a “Management” > “Stack management” > “Kibana” > “Index Patterns” > “Create index pattern” para crear el patrón del índice, lo dicho, como habitualmente (en este caso y sin las comillas) ‘netscaler-*’ y tendremos los datos ya en Elasticsearch almacenados de manera correcta. Ahora podríamos conectarnos desde “Discover” a nuestro índice de NetScaler y visualizar que está recogiendo datos.
Y luego ya, tras crear el índice en Kibana, ahora en nuestro Grafana deberíamos crear un “Data Source” que apunte contra nuestro Elasticsearch y el índice de NetScaler. Luego ya es dejar volar la imaginación, hacer un Dashboard con distintos Paneles, con distintos datos a visualizar, un mapamundi con las conexiones entrantes, uno de estilo Sankey para ver IPs origen/destino y el tráfico que se envían, en formato columnas, en tablas para ver datos concretos de por ejemplo los logins correctos, incorrectos, las conexiones…
Todo puede servir para que cojáis ideas y las mejoréis, con esto podréis ver qué pasa en nuestro(s) Citrix NetScaler en tiempo real, poniendo un refresco automático cada 10 segundos queda muy impresionante, también nos servirá para analizar el resumen de una jornada, o conocer cuando algo extraño sucede… Como siempre, gracias a todos y más a los que movéis este tipo de contenidos en redes sociales 😉