🧑‍🏫
liualexiang
  • Introduction
  • Azure
    • AKS Basic
    • AKS Spark
    • AZ ACR SYNC
    • Azure CMI SDWAN
    • Azure LB DeepDive
      • Azure LB DeepDive
    • Azure Service Principal Basic
    • Azure Internal VM Network Connectivity
      • Azure Internal VM Network Connectivity
    • Azure Cli Build
    • Azure Vm Memory Monitor
  • Blockchain
    • BTC
  • CRISPR
    • 使用Parallel_Cluster提升CRISPA效率
  • OpenSource
    • ElasticSearch
      • ES Get Started
      • ES Search Query
      • Kibana 可视化
      • Logstash配置
    • Ansible 基础
    • Infra As Code
      • Pulumi Get Started
      • Terraform Basic
    • ZooKeeper 基础
    • RPC与REST
    • 使用Python申请大量内存测试
    • 使用TPC_DS产生压测数据
    • Superset
      • Superset部署手册
    • 代码扫描
    • Git
      • Git Basic
      • Github Action Basic
      • Gitlab与AzureAD集成
      • Gitbook 基础教程
    • K8S
      • enter_node
      • K8s X509 Client Cert
      • K8s Basic
      • K8s Oidc
      • Docker 基础
      • helm基础
      • K8S_Secrets管理
      • 深入了解K8S
      • 混沌工程
      • Istio
      • 生态
      • CRD开发
      • k8s网络
    • Cloud_Custodian
    • Jenkins Basic
    • Nginx
    • ETCD
    • 正则
    • VictoriaMetrics
    • Kafka
  • MySQL
    • MySQL 调优
  • Linux
    • SSH Tunnel 上网
    • 内存管理
    • 在Linux系统中通过LUKS加密磁盘
    • 量子计算 Basic
    • IO多路复用
    • Iptables
    • tmux和screen
    • Systemd
    • OS 基础
    • jq基础
    • yum
    • neovim
  • Web
    • Html Css
    • Web部署
    • 缓存
  • Programming
    • 算法
      • 返回list中最大生序子序列长度
    • Python技巧
      • Python的语法糖
      • Python常用装饰器
      • AsyncIO基础
      • 自动化测试pytest
      • python中的下划线
      • 面向对象
      • Python的坑
      • Python配置文件管理
      • HTTP Stream Response
      • Python项目管理
    • 设计模式
      • 设计模式
      • 面向对象的思想
      • 编程概念
    • Go
      • Go 基础
      • Go常用功能
      • 结构体入门
    • 前端
    • Vue
    • NodeJS
  • Math
    • 多项式插值法
  • Security
    • HTTP常见攻击
    • 加密与签名
    • RSA
    • ECDSA
  • Solidity
    • Solidity基础
    • Blockchain Testnet Faucet
  • Tools
    • 视频处理ffmpeg
    • IDE配置
    • iTerm2美化
    • 密码管理
    • FRP配置
    • 工具集
由 GitBook 提供支持
在本页
  • 插件安装
  • 有关Logstash的配置解读
  • filter Grok的示例
  • 解读Filter常用部分,也是对logstash配置的关键部分
  1. OpenSource
  2. ElasticSearch

Logstash配置

插件安装

./logstash-plugin list
./logstash-plugin install logstash-input-azureblob

有关Logstash的配置解读

  • Logstash 数据处理的三大步骤: Input --> Filter --> Output

  • 如果logstash要处理多个业务的数据,可以用 logstash pipeline,每一个业务放到单独的pipeline文件夹下,一个pipeline文件夹一般只有一个input,output一般也只1个(当然如果业务想写到多个地方,也可以配置多个)。同一个业务里,如果有不同的服务,可以用不同的文件来处理 filter里的逻辑,然后将 add_field 加上一个字段表示这个服务

filter Grok的示例

下面 filter Grok的示例中,其中第一部分 ?<api_path>/api/v1/\S+) 表示的是使用正则进行处理,正则判断的是 /api/v1/xxx的这种路径,并将搜索到的结果保存到 api_path 中,这个结果可以再 output 中,通过 "%{[api_path]}" 的方式进行引用。

%{WORD} 是使用 grok 里的WORD pattern进行查找; BASE10NUM则是搜索数字,后面的:int指的是将数据类型转换为 int,默认是string类型(常用的还有 float, boolean, epoch,一共仅此4种);NUMBER是在BASE10NUM 搜索的基础上,再加上对科学计数法的支持;最后的 (?<unit>(s|ms|us)) 指的是将 s 或者 ms,或者us 保存到 unit里

再if 语句后面有一个 else {} 表示,如果grok匹配不到,则删除

最后if _grokparsefailure 指的是,对于 grok匹配不到的数据,一般会加上这个tag,然后依然保留这个数据,我们通过将这些数据删除,能确保后续处理的数据,一定是grok 匹配到的


filter {
  if "/my-log-group" in [logGroup] {
    grok {
      match => {
          "message" => "(?<api_path>\/api\/v1\/\S+) %{WORD} %{BASE10NUM:code:int} %{NUMBER:duration}(?<unit>(s|ms|us))"
      }
    }
  }
else {
  drop {}
  }
if "_grokparsefailure" in [tags] {
  drop {}
  }
}

不过在上述的示例种,也不免发现,由于时间单位,既有秒,又有毫秒,还有微秒,我们在存储的时候,希望时间统一。此时最好的办法是让开发写日志的时候统一起来。如果开发没有处理,我们也可以通过嵌入ruby脚本来完成。比如:

filter {
  if "/my-log-group" in [logGroup] {
    grok {
      match => {
        "message" => "(?<api_path>\/api\/v1\/\S+) %{WORD} %{BASE10NUM:code:int} %{NUMBER:duration}(?<unit>(s|ms|us))"
      }
    }
    ruby {
      init => "
        def to_milliseconds(value, unit)
          case unit
          when 'ms'
            value
          when 's'
            value * 1_000
          when 'us'
            value / 1_000.0
          else
            value
          end
        end
      "
      code => "event.set('duration', to_milliseconds(event.get('duration').to_f, event.get('unit')))"
    }
  } else {
    drop {}
  }
  
if "_grokparsefailure" in [tags] {
  drop{}
  }
}

解读Filter常用部分,也是对logstash配置的关键部分

  • 在filter里,一般可以用 json { source => "message"} 来对 message字段进行展开。如果 message是一个list,想要对list展开,则要用 split

  • filter 部分的split

    • 这部分的split主要是用来将JSON文件中的list数据拆开,建议将所有有关list的都拆出来。之所以有4个split,是因为有4个list (records最外侧一层,properties.flows一层,flows.flows一层,flows.flowTuples一层),从外层到内层注意拆解。

      • 示例 split { field => "[records]" }

  • mutate里面的split

    • 这部分split主要是将在filter中拆出来的每一个元素,通过某些特定的字段再次拆解,如将ResourceID按"/"进行拆分,将 "flowTuples" 里面的每一行(之所以每一行,是因为filter中已经split过)按","进行拆分,要读取拆分后的数据,则按[0],[1][...] 这种顺序读取。示例: split => { "[records][resourceId]" => "/"}

    • mutate 里面的split需要用 =>,filter里面的split直接跟 {}

    • add_field 添加需要的字段,这个主要是将mutate里面split之后的结果给添加到要输出的字段中(list索引从0开始)。毕竟split的最终目的就是为了提取有效字段,add_field的重要性可想而知。示例: add_field => { "Subscription" => "%{[records][resourceId][2]}"}

    • convert 将数据转换为需要的数据类型,示例: convert => {"Subscription" => "string"}

    • logstash默认已经有geoip的插件,但需要指定下GeoLite2数据库

    • source 后面指定要处理哪个字段作为ip地址(字段名字可以在上述add_field中指定)

    • target 后面跟要将查询结果保存到哪个字段中,如指定 geoip,则对于查询的城市,字段名为 geoip.city_name

    • add_field 可以添加字段,比如可以通过查询经纬度信息,将其添加到geoip.coordinates 这个list中

    • remove_field 默认情况下,如果不加说明,会打印所有字段,通过remove_field 可以移除指定字段

    • fields 这个可以直接指定要显示哪些字段(如果要对经纬度数据合并,那么fields里面要有经纬度的字段,才能用add_field,如果没有fields字段,则说明显示所有)

    • 如果在ES里面,想要直接识别经纬度坐标,那么需要将经纬度保存成float类型。同时需要创建ES的index的时候,指定这个字段类型为 geo_point.

  • 通过date 对时间日期处理,比如将unixtimestamp指定为unix时间戳: date { match => ["unixtimestamp" , "UNIX"] }

  • output 的定义

    • output根据需要,可以输出到redis, ES等,一般为debug方便,也可以输出为 stdout(),但在stdout中,一般指定为 rubydebug 为解码方式,这样输出的格式比较友好,方便debug.

    • 示例: output { stdout { codec => rubydebug } }

参考文档:https://docs.microsoft.com/zh-cn/azure/network-watcher/network-watcher-read-nsg-flow-logs

上一页Kibana 可视化下一页Ansible 基础

最后更新于5个月前

对于ip处理的的插件

geoip