
Recopilant els LOGs de NetScaler a Elasticsearch i visualitzant-los a Grafana
Hoy tenemos un menú bastante interesante, y si es que si tienes a tu cargo algún NetScaler en tu infraestructura, en este post veremos cómo podemos recopilar sus logs para después ser tratados y finalmente hacernos un dashboard en Grafana que muestre en tiempo real (o histórico) los datos recopilados.
Així que el que s' ha dit, podemos hacer que nuestro Citrix NetScaler envíe sus logs a Logstash para que los trate y separe en datos estructurados lo que son líneas de distintos logs. Almacenaremos en un índice de Elasticsearch esta información y luego con un conector o ‘Data Source’ de Grafana podremos acceder a estos datos, para visualizarlos en tiempo real o en un periodo de tiempo determinado (últimas 24h, mes…). Y cada uno puede hacerse el dashboard en base al rol que haga su NS, si tiene el servicio Gateway viendo las IP’s públicas que acceden, tráficos… Si tenemos un Content Switching ídem además de destinos… Si tenemos un Load Balancing pues analizar realmente por donde van las conexiones… al final todo lo que se almacena en un log lo podemos explotar y NetScaler es un dispositivo muy interesante para conocer qué pasa dentro, como puedan ser la visualización de las autenticaciones, correctas o incorrectas.
Primero obviamente tenemos que tener la parte de Elastic Stack instalada, el que ve sent Logstash, Elasticsearch y Kibana; luego ya sería indicarle a nuestro NetScaler que envíe los Logs a Logstash, al puerto que nos de la gana, nos inventaremos uno (des de “System” > “Auditing” > “Syslog Auditing”. Recordar grabar la configuración en el NetScaler.
Lo siguiente será crear un fichero en Logstash que deberá de ingestar, transformar y enviar los datos. Primero escuchar en el puerto UDP indicado en el paso anterior; segon, tratar los logs que recibe y separarlos en distintos campos para ser por último almacenados en nuestro Elasticsearch.
input { udp { type => "Netscaler" port => "1517" tags => ["Netscaler"] } } filter { if [type] == "Netscaler" { grok { match => { "message" => [ "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}Context%{SPACE}%{USERNAME:Usuari}@%{IPV4:IP_origen}%{SPACE}-%{SPACE}SessionId:%{SPACE}%{INT:Session_id}%{SPACE}-%{SPACE}User%{SPACE}%{USERNAME:Usuario2}%{SPACE}-%{SPACE}Client_ip%{SPACE}%{IPV4:IP_origen2}%{SPACE}-%{SPACE}Nat_ip%{SPACE}\"Mapped Ip\"%{SPACE}-%{SPACE}Vserver%{SPACE}%{IPV4:vServer_ip}:%{NUMBER:vServer_puerto}%{SPACE}-%{SPACE}Start_time%{SPACE}\"%{DATE_US:Inicio_fecha}:%{TIME:Inicio_hora}%{SPACE}\"%{SPACE}-%{SPACE}End_time%{SPACE}\"%{DATE_US:Fin_fecha}:%{TIME:Fin_hora}%{SPACE}\"%{SPACE}-%{SPACE}Duration%{SPACE}%{TIME:Duracion}%{SPACE}-%{SPACE}Http_resources_accessed%{SPACE}%{NUMBER:Http_resources_accessed}%{SPACE}-%{SPACE}NonHttp_services_accessed%{SPACE}%{NUMBER:NonHttp_services_accessed}%{SPACE}-%{SPACE}Total_TCP_connections%{SPACE}%{NUMBER:Total_TCP_connections}%{SPACE}-%{SPACE}Total_UDP_flows%{SPACE}%{NUMBER:otal_UDP_flows}%{SPACE}-%{SPACE}Total_policies_allowed%{SPACE}%{NUMBER:Total_policies_allowed}%{SPACE}-%{SPACE}Total_policies_denied%{SPACE}%{NUMBER:Total_policies_denied}%{SPACE}-%{SPACE}Total_bytes_send%{SPACE}%{NUMBER:Total_bytes_send}%{SPACE}-%{SPACE}Total_bytes_recv%{SPACE}%{NUMBER:Total_bytes_recv}%{SPACE}-%{SPACE}Total_compressedbytes_send%{SPACE}%{NUMBER:Total_compressedbytes_send}%{SPACE}-%{SPACE}Total_compressedbytes_recv%{SPACE}%{NUMBER:Total_compressedbytes_recv}%{SPACE}-%{SPACE}Compression_ratio_send%{SPACE}%{NUMBER:Compression_ratio_send}\%%{SPACE}-%{SPACE}Compression_ratio_recv%{SPACE}%{NUMBER:Compression_ratio_recv}\%%{SPACE}-%{SPACE}LogoutMethod%{SPACE}\"%{DATA:LogoutMethod}\"%{SPACE}-%{SPACE}Group\(s\)%{SPACE}\"%{DATA:Group}\"", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}Context%{SPACE}%{USERNAME:Usuari}@%{IPV4:IP_origen}%{SPACE}-%{SPACE}SessionId:%{SPACE}%{INT:Session_id}%{SPACE}-%{SPACE}User%{SPACE}%{USERNAME:Usuario2}%{SPACE}-%{SPACE}Client_ip%{SPACE}%{IPV4:IP_origen2}%{SPACE}-%{SPACE}Nat_ip%{SPACE}\"Mapped Ip\"%{SPACE}-%{SPACE}Vserver%{SPACE}%{IPV4:vServer_ip}:%{NUMBER:vServer_puerto}%{SPACE}-%{SPACE}%{GREEDYDATA:Mensaje}", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}User%{SPACE}%{USERNAME:Usuari}%{SPACE}-%{SPACE}Client_ip%{SPACE}%{IPV4:IP_origen}%{SPACE}-%{SPACE}%{GREEDYDATA:Mensaje}", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}Source%{SPACE}%{IPV4:IP_origen}:%{NUMBER:Puerto_origen}%{SPACE}-%{SPACE}Vserver%{SPACE}%{IPV4:vServer_ip}:%{NUMBER:vServer_puerto}%{SPACE}-%{SPACE}NatIP%{SPACE}%{IPV4:NAT_ip}:%{NUMBER:NAT_puerto}%{SPACE}-%{SPACE}Destination%{SPACE}%{IPV4:IP_destino}:%{NUMBER:Puerto_destino}%{SPACE}-%{SPACE}Delink%{SPACE}Time%{SPACE}%{DATE_US:Delink_fecha}:%{TIME:Delink_hora}%{SPACE}-%{SPACE}Total_bytes_send%{SPACE}%{INT:Total_bytes_enviados}%{SPACE}-%{SPACE}Total_bytes_recv%{SPACE}%{INT:Total_bytes_recibidos}", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}Source%{SPACE}%{IPV4:IP_origen}:%{NUMBER:Puerto_origen}%{SPACE}-%{SPACE}Destination%{SPACE}%{IPV4:Destino_ip}:%{NUMBER:Destino_puerto}%{SPACE}-%{SPACE}Start%{SPACE}Time%{SPACE}%{DATE_US:Inicio_fecha}:%{TIME:Inicio_hora}%{SPACE}-%{SPACE}End%{SPACE}Time%{SPACE}%{DATE_US:Fin_fecha}:%{TIME:Fin_hora}%{SPACE}-%{SPACE}Total_bytes_send%{SPACE}%{INT:Total_bytes_enviados}%{SPACE}-%{SPACE}Total_bytes_recv%{SPACE}%{INT:Total_bytes_recibidos}", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}Source%{SPACE}%{IPV4:IP_origen}:%{NUMBER:Puerto_origen}%{SPACE}-%{SPACE}Vserver%{SPACE}%{IPV4:vServer_ip}:%{NUMBER:vServer_puerto}%{SPACE}-%{SPACE}NatIP%{SPACE}%{IPV4:NAT_ip}:%{NUMBER:NAT_puerto}%{SPACE}-%{SPACE}Destination%{SPACE}%{IPV4:IP_destino}:%{NUMBER:Puerto_destino}%{SPACE}-%{SPACE}Delink%{SPACE}Time%{SPACE}%{DATE_US:Delink_fecha}:%{TIME:Delink_hora}%{SPACE}Total_bytes_send%{SPACE}%{INT:Total_bytes_enviados}%{SPACE}-%{SPACE}Total_bytes_recv%{SPACE}%{INT:Total_bytes_recibidos}", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}SPCBId%{SPACE}%{INT:SPCBId}%{SPACE}-%{SPACE}ClientIP%{SPACE}%{IPV4:IP_origen}%{SPACE}-%{SPACE}ClientPort%{SPACE}%{NUMBER:Puerto_origen}%{SPACE}-%{SPACE}VserverServiceIP%{SPACE}%{IPV4:vServer_ip}%{SPACE}-%{SPACE}VserverServicePort%{SPACE}%{NUMBER:vServer_puerto}%{SPACE}-%{SPACE}ClientVersion%{SPACE}%{DATA:Client_version}%{SPACE}-%{SPACE}CipherSuite%{SPACE}\"%{DATA:Cipher_suite}\"(%{SPACE}-%{SPACE}Session%{SPACE}Reuse%{SPACE}-%{SPACE}HandshakeTime%{SPACE}%{INT:Handshake_time}%{SPACE}ms)?", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}SPCBId%{SPACE}%{INT:SPCBId}%{SPACE}-%{SPACE}IssuerName%{SPACE}\"%{SPACE}%{DATA:Issuer_name}\"", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}SPCBId%{SPACE}%{INT:SPCBId}%{SPACE}-%{SPACE}SubjectName%{SPACE}\"%{SPACE}%{DATA:Subject_name}\"", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}Context%{SPACE}%{USERNAME:Usuari}@%{IPV4:IP_origen}%{SPACE}-%{SPACE}SessionId:%{SPACE}%{INT:Session_id}%{SPACE}-%{SPACE}%{HOSTNAME:FQDN}%{SPACE}User%{SPACE}%{USERNAME:Usuario2}%{SPACE}:%{SPACE}Group\(s\)%{SPACE}%{DATA:Grup}%{SPACE}:%{SPACE}Vserver%{SPACE}%{IPV4:vServer_ip}:%{NUMBER:vServer_puerto}%{SPACE}-%{SPACE}%{DATE_US:Data}:%{TIME:Hora}%{SPACE}:%{SPACE}%{GREEDYDATA:Mensaje}", "^<%{POSINT:syslog_pri}>%{SPACE}%{DATE_US:Log_fecha}:%{TIME:Log_hora}%{SPACE}%{SYSLOGHOST:NSlog_hostname}%{SPACE}0-PPE-0%{SPACE}:%{SPACE}default%{SPACE}%{WORD:Log_tipo}%{SPACE}%{WORD:Log_evento}%{SPACE}%{INT:Log_id}%{SPACE}0%{SPACE}:%{SPACE}%{GREEDYDATA:Mensaje}" ] } } geoip { source => "IP_origen" target => "geoip" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float" ] convert => [ "Total_bytes_enviados", "integer" ] convert => [ "Total_bytes_recibidos", "integer" ] } } } output { if ([type]=="Netscaler"){ elasticsearch { index => "netscaler-%{+YYYY.MM.dd}" hosts => "DIRECCION_IP_ELASTICSEARCH:9200" } } }
Así que creamos por ejemplo el siguiente fichero de configuración ‘/etc/logstash/conf.d/netscaler.conf’, yo os dejo mi ejemplo aquí abajo. Mis filtros son muy básicos y de primero de Logstash, pero funcionan 😉 así que si te interesan puedes copiarlos, y si los mejoras me los pasas 🙂 Al final indicaremos en la salida el servidor de Elasticsearch donde mandar los datos y el índice donde se almacenará; recordar que si tenéis autenticación poner los parámetros de ‘username’ & ‘password’ en el output. La verdad que estoy viendo ahora el código y es bastante mejorable, me rayé con los espacios recuerdo… espero me podáis perdonar pero funciona, al menos con las últimas versiones como pueda ser NetScaler 13.0.
Un cop creat el fitxer de configuració, recordar reiniciar el servicio de Logstash para recargar. Después como siempre, iremos a Kibana y una vez los datos estén entrando ya podremos ir a “Management” > “Stack management” > “Kibana” > “Index Patterns” > “Create index pattern” para crear el patrón del índice, el que s' ha dit, com habitualment (en aquest cas i sense les cometes) ‘netscaler-*’ y tendremos los datos ya en Elasticsearch almacenados de manera correcta. Ahora podríamos conectarnos desde “Discover” a nuestro índice de NetScaler y visualizar que está recogiendo datos.
Y luego ya, tras crear el índice en Kibana, ahora en nuestro Grafana deberíamos crear un “Data Source” que apunte contra nuestro Elasticsearch y el índice de NetScaler. Després ja és deixar volar la imaginació, fer un Dashboard amb diferents Panells, amb diferents dades a visualitzar, un mapamundi con las conexiones entrantes, uno de estilo Sankey para ver IPs origen/destino y el tráfico que se envían, en formato columnas, en taules para ver datos concretos de por ejemplo los logins correctos, incorrectes, las conexiones…
Todo puede servir para que cojáis ideas y las mejoréis, con esto podréis ver qué pasa en nuestro(s) Citrix NetScaler en tiempo real, posant un refresc automàtic cada 10 segundos queda muy impresionante, también nos servirá para analizar el resumen de una jornada, o conocer cuando algo extraño sucede… Com sempre, gracias a todos y más a los que movéis este tipo de contenidos en redes sociales 😉