preface

vflowThis open source project was the first project I referred to when I started working on Flow. In the end, we didn’t use this project, but it was an enterprise-class project dealing with protocols like NetFlow, and there was a lot to explore.

features

  • PFIX RFC7011 collector
  • sFLow v5 raw header packet collector
  • Netflow v9 collector
  • Decoding sFlow raw header L2/L3/L4
  • Produce to Apache Kafka, NSQ, NATS
  • Replicate IPFIX to 3rd party collector
  • Supports IPv4 and IPv6
  • Monitoring with InfluxDB and OpenTSDB backend

architecture

Dynamic pool

Work Pool is provided, and the wokrer number is dynamically adjusted based on the number of buff stack messages, as described in the previous article.

Plug-in architecture

VFlow supports Kafka,NSQ, and NATS message queues, but you can easily write messages to other message queues, such as RabbitMQ.

package producer import ( "log" "sync" ) // Producer represents messaging queue type Producer struct { MQ MQueue MQConfigFile string MQErrorCount *uint64 Topic string Chan chan []byte Logger *log.Logger } // MQueue represents messaging queue methods type MQueue interface { setup(string, *log.Logger) error inputMsg(string, chan []byte, *uint64) } // NewProducer constructs new Messaging Queue func NewProducer(mqName string) *Producer { var mqRegistered = map[string]MQueue{ "kafka": new(Kafka), "nsq": new(NSQ), "nats": new(NATS), "rawSocket": new(RawSocket), } return &Producer{ MQ: mqRegistered[mqName], } } // Run configs and tries to be ready to produce func (p *Producer) Run() error { var ( wg sync.WaitGroup err error )  err = p.MQ.setup(p.MQConfigFile, p.Logger) if err ! = nil { return err } wg.Add(1) go func() { defer wg.Done() topic := p.Topic p.MQ.inputMsg(topic, p.Chan, p.MQErrorCount) }() wg.Wait() return nil } // Shutdown stops the producer func (p *Producer) Shutdown() { close(p.Chan) }

An interface is provided that can be implemented if extended to other message queues.

The extension supports more protocols

VFlow temporarily supports the SFlow and NetFlow protocols, and for cFlow and NetStream, we can extend the implementation. I have extended the NetStream module. In vflow.go, add the specified flow worker. As follows:

// The version is too old, Netflow5 := NewNetFlowV5() netstream5 := NewNetStreamV5() // Sflow := NewSFlow() ipfix := NewIPFIX() netflow9 := NewNetflowV9() netstream9 := NewNetStreamV9() protos[0] = sFlow protos[1] = ipfix protos[2] = netflow9 protos[3] = netstream9 protos[4] = netflow5 protos[5] = netstream5 for _, p := range protos { wg.Add(1) go func(p proto) { defer wg.Done() p.run() }(p) }

Support to monitor

Monitoring correlation is provided under the Monitor folder and currently supports Metrics import to InfluxDB and OpenSDB.

Provide comprehensive metrics.

Hardware requirements

Load IPFIX PPS CPU RAM
low < 1K 2-4 64M
moderate < 10K 8 + 256M
high < 50K 12 + 512M
x-high < 100K 24 + 1G

conclusion

Here NGINX supports proxies of the UDP protocol. 2) As for the final storage of data collected by Flow, VFlow provides three ideas: ClickHouse, MemSQL and Spark. 3) Of course, we didn’t use VFlow because we wrote the Flow Input plug-in of Telegraf, which can make full use of all kinds of output plug-ins and filters provided by Telegraf.