在 Elasticsearch 中收集 NetScaler LOG 并在 Grafana 中可视化它们

今天我们有一个非常有趣的菜单, 如果您负责基础架构中的 NetScaler, 在这篇文章中,我们将了解如何收集他们的日志以便稍后进行处理,并最终在 Grafana 中制作一个实时显示的仪表板 (或 Historical) 收集的数据.

 

所以我说的, 我们可以让我们的 Citrix NetScaler 将其日志发送到 Logstash 进行处理,并将其分离为结构化数据,即来自不同日志的行. 我们将此信息存储在 Elasticsearch 索引中,然后使用连接器或“数据源”’ 我们将能够访问这些数据, 实时或在特定时间段内查看 (最近 24 小时, 月…). 每个人都可以根据他们的 NS 所扮演的角色制作仪表板, 如果您让网关服务看到访问, 业务… 如果除了目标之外,我们还有内容切换 ditto… 如果我们有负载均衡,那么就真正分析连接的去向…  最后,存储在日志中的所有内容都可以被利用,而 NetScaler 是一个非常有趣的设备,可以了解内部发生的事情, 例如身份验证的显示, 正确或错误.

 

首先,我们显然必须安装 Elastic Stack 部分, 什么 Logstash, Elasticsearch琵琶; 然后,它将指示我们的 NetScaler 将日志发送到 Logstash, 前往我们选择的港口, 我们将发明一个 (因为 “系统” > “审计” > “Syslog 审计”. 请记住将配置保存到 NetScaler.

 

接下来,在 Logstash 中创建一个需要摄取的文件, 转换和发送数据. 首先侦听上一步中指示的 UDP 端口; 第二, 处理它收到的日志,并将它们分成不同的字段,最终存储在我们的 Elasticsearch 中.

 

输入 {
        UDP 协议 {
                类型 => "Netscaler 公司"
                端口 => "1517"
                标签 => ["Netscaler 公司"]
        }
}

滤波器 {

        如果 [类型] == "Netscaler 公司" {
                        格罗克 {
                            匹配 => { "消息" => [
                              "^<%{POSINT:syslog_pri}>%{空间}%{DATE_US:Log_fecha}:%{时间:Log_hora}%{空间}%{SYSLOG 主机:NSlog_hostname}%{空间}0-EPP-0%{空间}:%{空间}默认%{空间}%{词:Log_tipo}%{空间}%{词:Log_evento}%{空间}%{INT 系列:Log_id}%{空间}0%{空间}:%{空间}上下文%{空间}%{用户名:用户}@%{IPV4:IP_origen}%{空间}-%{空间}会话 ID:%{空间}%{INT 系列:Session_id}%{空间}-%{空间}用户%{空间}%{用户名:用户 2}%{空间}-%{空间}Client_ip%{空间}%{IPV4:IP_origen2}%{空间}-%{空间}Nat_ip%{空间}\"映射的 Ip"%{空间}-%{空间}Vserver%{空间}%{IPV4:vServer_ip}:%{数:vServer_puerto}%{空间}-%{空间}Start_time%{空间}\"%{DATE_US:Inicio_fecha}:%{时间:Inicio_hora}%{空间}\"%{空间}-%{空间}End_time%{空间}\"%{DATE_US:Fin_fecha}:%{时间:Fin_hora}%{空间}\"%{空间}-%{空间}Duration%{空间}%{时间:期间}%{空间}-%{空间}Http_resources_accessed%{空间}%{数:Http_resources_accessed}%{空间}-%{空间}NonHttp_services_accessed%{空间}%{数:NonHttp_services_accessed}%{空间}-%{空间}Total_TCP_connections%{空间}%{数:Total_TCP_connections}%{空间}-%{空间}Total_UDP_flows%{空间}%{数:otal_UDP_flows}%{空间}-%{空间}Total_policies_allowed%{空间}%{数:Total_policies_allowed}%{空间}-%{空间}Total_policies_denied%{空间}%{数:Total_policies_denied}%{空间}-%{空间}Total_bytes_send%{空间}%{数:Total_bytes_send}%{空间}-%{空间}Total_bytes_recv%{空间}%{数:Total_bytes_recv}%{空间}-%{空间}Total_compressedbytes_send%{空间}%{数:Total_compressedbytes_send}%{空间}-%{空间}Total_compressedbytes_recv%{空间}%{数:Total_compressedbytes_recv}%{空间}-%{空间}Compression_ratio_send%{空间}%{数:Compression_ratio_send}\%%{空间}-%{空间}Compression_ratio_recv%{空间}%{数:Compression_ratio_recv}\%%{空间}-%{空间}LogoutMethod%{空间}\"%{数据:LogoutMethod}\"%{空间}-%{空间}Group\(s\)%{空间}\"%{数据:Group}\"",
                              "^<%{POSINT:syslog_pri}>%{空间}%{DATE_US:Log_fecha}:%{时间:Log_hora}%{空间}%{SYSLOG 主机:NSlog_hostname}%{空间}0-EPP-0%{空间}:%{空间}默认%{空间}%{词:Log_tipo}%{空间}%{词:Log_evento}%{空间}%{INT 系列:Log_id}%{空间}0%{空间}:%{空间}上下文%{空间}%{用户名:用户}@%{IPV4:IP_origen}%{空间}-%{空间}会话 ID:%{空间}%{INT 系列:Session_id}%{空间}-%{空间}用户%{空间}%{用户名:用户 2}%{空间}-%{空间}Client_ip%{空间}%{IPV4:IP_origen2}%{空间}-%{空间}Nat_ip%{空间}\"映射的 Ip"%{空间}-%{空间}Vserver%{空间}%{IPV4:vServer_ip}:%{数:vServer_puerto}%{空间}-%{空间}%{GREEDYDATA:消息}",
                              "^<%{POSINT:syslog_pri}>%{空间}%{DATE_US:Log_fecha}:%{时间:Log_hora}%{空间}%{SYSLOG 主机:NSlog_hostname}%{空间}0-EPP-0%{空间}:%{空间}默认%{空间}%{词:Log_tipo}%{空间}%{词:Log_evento}%{空间}%{INT 系列:Log_id}%{空间}0%{空间}:%{空间}用户%{空间}%{用户名:用户}%{空间}-%{空间}Client_ip%{空间}%{IPV4:IP_origen}%{空间}-%{空间}%{GREEDYDATA:消息}",
                              "^<%{POSINT:syslog_pri}>%{空间}%{DATE_US:Log_fecha}:%{时间:Log_hora}%{空间}%{SYSLOG 主机:NSlog_hostname}%{空间}0-EPP-0%{空间}:%{空间}默认%{空间}%{词:Log_tipo}%{空间}%{词:Log_evento}%{空间}%{INT 系列:Log_id}%{空间}0%{空间}:%{空间}Source%{空间}%{IPV4:IP_origen}:%{数:Puerto_origen}%{空间}-%{空间}Vserver%{空间}%{IPV4:vServer_ip}:%{数:vServer_puerto}%{空间}-%{空间}NatIP%{空间}%{IPV4:NAT_ip}:%{数:NAT_puerto}%{空间}-%{空间}Destination%{空间}%{IPV4:IP_destino}:%{数:Puerto_destino}%{空间}-%{空间}Delink%{空间}Time%{空间}%{DATE_US:Delink_fecha}:%{时间:Delink_hora}%{空间}-%{空间}Total_bytes_send%{空间}%{INT 系列:Total_bytes_enviados}%{空间}-%{空间}Total_bytes_recv%{空间}%{INT 系列:Total_bytes_recibidos}",
                              "^<%{POSINT:syslog_pri}>%{空间}%{DATE_US:Log_fecha}:%{时间:Log_hora}%{空间}%{SYSLOG 主机:NSlog_hostname}%{空间}0-EPP-0%{空间}:%{空间}默认%{空间}%{词:Log_tipo}%{空间}%{词:Log_evento}%{空间}%{INT 系列:Log_id}%{空间}0%{空间}:%{空间}Source%{空间}%{IPV4:IP_origen}:%{数:Puerto_origen}%{空间}-%{空间}Destination%{空间}%{IPV4:Destino_ip}:%{数:Destino_puerto}%{空间}-%{空间}Start%{空间}Time%{空间}%{DATE_US:Inicio_fecha}:%{时间:Inicio_hora}%{空间}-%{空间}End%{空间}Time%{空间}%{DATE_US:Fin_fecha}:%{时间:Fin_hora}%{空间}-%{空间}Total_bytes_send%{空间}%{INT 系列:Total_bytes_enviados}%{空间}-%{空间}Total_bytes_recv%{空间}%{INT 系列:Total_bytes_recibidos}",
                              "^<%{POSINT:syslog_pri}>%{空间}%{DATE_US:Log_fecha}:%{时间:Log_hora}%{空间}%{SYSLOG 主机:NSlog_hostname}%{空间}0-EPP-0%{空间}:%{空间}默认%{空间}%{词:Log_tipo}%{空间}%{词:Log_evento}%{空间}%{INT 系列:Log_id}%{空间}0%{空间}:%{空间}Source%{空间}%{IPV4:IP_origen}:%{数:Puerto_origen}%{空间}-%{空间}Vserver%{空间}%{IPV4:vServer_ip}:%{数:vServer_puerto}%{空间}-%{空间}NatIP%{空间}%{IPV4:NAT_ip}:%{数:NAT_puerto}%{空间}-%{空间}Destination%{空间}%{IPV4:IP_destino}:%{数:Puerto_destino}%{空间}-%{空间}Delink%{空间}Time%{空间}%{DATE_US:Delink_fecha}:%{时间:Delink_hora}%{空间}Total_bytes_send%{空间}%{INT 系列:Total_bytes_enviados}%{空间}-%{空间}Total_bytes_recv%{空间}%{INT 系列:Total_bytes_recibidos}",
                              "^<%{POSINT:syslog_pri}>%{空间}%{DATE_US:Log_fecha}:%{时间:Log_hora}%{空间}%{SYSLOG 主机:NSlog_hostname}%{空间}0-EPP-0%{空间}:%{空间}默认%{空间}%{词:Log_tipo}%{空间}%{词:Log_evento}%{空间}%{INT 系列:Log_id}%{空间}0%{空间}:%{空间}SPCBId%{空间}%{INT 系列:SPCBId}%{空间}-%{空间}ClientIP%{空间}%{IPV4:IP_origen}%{空间}-%{空间}ClientPort%{空间}%{数:Puerto_origen}%{空间}-%{空间}VserverServiceIP%{空间}%{IPV4:vServer_ip}%{空间}-%{空间}VserverServicePort%{空间}%{数:vServer_puerto}%{空间}-%{空间}客户端版本%{空间}%{数据:Client_version}%{空间}-%{空间}CipherSuite 百分比{空间}\"%{数据:Cipher_suite}\"(%{空间}-%{空间}会话%{空间}重用%{空间}-%{空间}握手时间%{空间}%{INT 系列:Handshake_time}%{空间}女士)?",
                              "^<%{POSINT:syslog_pri}>%{空间}%{DATE_US:Log_fecha}:%{时间:Log_hora}%{空间}%{SYSLOG 主机:NSlog_hostname}%{空间}0-EPP-0%{空间}:%{空间}默认%{空间}%{词:Log_tipo}%{空间}%{词:Log_evento}%{空间}%{INT 系列:Log_id}%{空间}0%{空间}:%{空间}SPCBId%{空间}%{INT 系列:SPCBId}%{空间}-%{空间}发行者名称%{空间}\"%{空间}%{数据:Issuer_name}\"",
                              "^<%{POSINT:syslog_pri}>%{空间}%{DATE_US:Log_fecha}:%{时间:Log_hora}%{空间}%{SYSLOG 主机:NSlog_hostname}%{空间}0-EPP-0%{空间}:%{空间}默认%{空间}%{词:Log_tipo}%{空间}%{词:Log_evento}%{空间}%{INT 系列:Log_id}%{空间}0%{空间}:%{空间}SPCBId%{空间}%{INT 系列:SPCBId}%{空间}-%{空间}主题名称%{空间}\"%{空间}%{数据:Subject_name}\"",
                              "^<%{POSINT:syslog_pri}>%{空间}%{DATE_US:Log_fecha}:%{时间:Log_hora}%{空间}%{SYSLOG 主机:NSlog_hostname}%{空间}0-EPP-0%{空间}:%{空间}默认%{空间}%{词:Log_tipo}%{空间}%{词:Log_evento}%{空间}%{INT 系列:Log_id}%{空间}0%{空间}:%{空间}上下文%{空间}%{用户名:用户}@%{IPV4:IP_origen}%{空间}-%{空间}会话 ID:%{空间}%{INT 系列:Session_id}%{空间}-%{空间}%{主机名:FQDN (英语)}%{空间}用户%{空间}%{用户名:用户 2}%{空间}:%{空间}Group\(s\)%{空间}%{数据:群}%{空间}:%{空间}Vserver%{空间}%{IPV4:vServer_ip}:%{数:vServer_puerto}%{空间}-%{空间}%{DATE_US:飞驰}:%{时间:小时}%{空间}:%{空间}%{GREEDYDATA:消息}",
                              "^<%{POSINT:syslog_pri}>%{空间}%{DATE_US:Log_fecha}:%{时间:Log_hora}%{空间}%{SYSLOG 主机:NSlog_hostname}%{空间}0-EPP-0%{空间}:%{空间}默认%{空间}%{词:Log_tipo}%{空间}%{词:Log_evento}%{空间}%{INT 系列:Log_id}%{空间}0%{空间}:%{空间}%{GREEDYDATA:消息}"
                                       ]
                            }
                        }

                        Geoip {
                          来源 => "IP_origen"
                          目标 => "Geoip"
                          add_field => [ "[Geoip][坐标]", "%{[Geoip][经度]}" ]
                          add_field => [ "[Geoip][坐标]", "%{[Geoip][纬度]}"  ]
                        }

                        静音 {
                          转换 => [ "[Geoip][坐标]", "浮" ]
                          转换 => [ "Total_bytes_enviados", "整数" ]
                          转换 => [ "Total_bytes_recibidos", "整数" ]
                        }
        }
}

输出 {

        如果 ([类型]=="Netscaler 公司"){

                ElasticSearch 服务 {
                   索引 => "NetScaler-%{+YYYY.MM.dd}"
                   主机=> "DIRECCION_IP_ELASTICSEARCH:9200"
                }

        }
}

因此,让我们创建以下配置文件 '/etc/logstash/conf.d/netscaler.conf', 我在下面留下我的例子. 我的过滤器非常基本,首先是 Logstash, 但它们有效😉,因此如果您有兴趣,可以复制它们, 如果你改进它们,就把它们传🙂给我最后,我们将在输出中指示 Elasticsearch 服务器将数据发送到何处以及将数据存储的索引; 请记住,如果您有身份验证,请输入 'username' 参数’ & '密码’ 在输出中. 事实是,我现在正在查看代码,它可以进行改进, 我用我记得的空白处划伤自己… 我希望你能原谅我,但它有效, 至少使用 NetScaler 等最新版本 13.0.

 

创建配置文件后, 请记得重启 Logstash 服务以重新加载. 事后一如既往, 我们将转到 Kibana,一旦数据进入,我们就可以转到 “管理” > “堆栈管理” > “琵琶” > “索引模式” > “创建索引模式” 创建索引模式, 正如我所说, 照常 (在这种情况下,不带引号) 'netscaler-*’ 我们将将数据正确存储在 Elasticsearch 中. 现在我们可以从 “发现” 添加到我们的 NetScaler 索引中,并可视化它正在收集数据.

 

然后, 在 Kibana 中创建索引后, 现在在我们的 Grafana 中,我们应该创建一个 “数据源” 它以我们的 Elasticsearch 和 NetScaler 索引为目标. 然后让你的想象力自由驰骋, 制作具有不同 Dashboard 的 Dashboard, 使用不同的数据进行可视化, 一 世界地图 使用传入连接, 风格之一 桑基 查看源/目标 IP 和正在发送的流量, 列格式, 在 僵局 要查看特定数据,例如正确的登录信息, 不對, 联系…

一切都可以用来让你思考和改进它们, 有了这个,您将能够看到我们的(s) Citrix NetScaler 实时, 每 个 10 seconds 非常令人印象深刻, 它还将帮助我们分析一天的总结, 或者知道什么时候发生奇怪的事情… 照常, 感谢所有在社交网络😉上移动此类内容的人

 

推荐文章

作者

nheobug@bujarra.com
Autor del blog Bujarra.com Cualquier necesidad que tengas, 请随时与我联系, 我会尽我所能帮助你, 分享就是生活 ;) . 享受文档!!!