Jaime Botello
2018-03-19 07:44:57 UTC
Hi,
We are working on getting pmacct as a netflow producer for pnda.io data
pipeline and we are wondering if anyone else had tried to integrate pmacct
with pnda.io.
pnda.io use kafka and avro to get data into the pipeline.
After getting all components required on pmacct we believe there's
something still missing.
I'm using the following code just to test and consume the data that is in
the pipeline
https://github.com/pndaproject/example-kafka-clients/blob/develop/python/consumer.py
In theory, when running the above code, we should get an output like the
following
https://github.com/pndaproject/pnda-guide/blob/develop/producer/data-preparation.md#consumer-example
Instead, the script fails with unicode decoding errors
------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.160.138.1\x1a104.160.138.1\x1c104.160.128.20\***@8\xea\x8e\x06\x8e
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884', checksum=None,
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
Traceback (most recent call last):
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 445, in read
return self.read_data(self.writers_schema, self.readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 490, in read_data
return self.read_record(writers_schema, readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in position 15:
invalid start byte
Current pnda.io, use some avro plugin when integrating with logstash, in
our case we would like to use pmacct instead. We believe, in the case of
pmacct something else may required in order to push data correctly to
pnda.io kafka.
https://github.com/pndaproject/pnda-guide/blob/develop/producer/logstash.md
On thing I notice that pnda.io and logstash define as part of the
avro/kafka plugin is
value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer
However, i was not able to find out if this is something I can set up with
pmacct/kafka integration.
I'm attaching some relevant configuration.
***@ip-10-180-221-190:~/pmacct$ more netflow_kafka.conf
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos, src_as,
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net, dst_net,
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092
***@ip-10-180-221-190:~/pmacct$ more pnda.avsc
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
***@ip-10-180-221-190:~/pmacct$
thanks for any light here
We are working on getting pmacct as a netflow producer for pnda.io data
pipeline and we are wondering if anyone else had tried to integrate pmacct
with pnda.io.
pnda.io use kafka and avro to get data into the pipeline.
After getting all components required on pmacct we believe there's
something still missing.
I'm using the following code just to test and consume the data that is in
the pipeline
https://github.com/pndaproject/example-kafka-clients/blob/develop/python/consumer.py
In theory, when running the above code, we should get an output like the
following
https://github.com/pndaproject/pnda-guide/blob/develop/producer/data-preparation.md#consumer-example
Instead, the script fails with unicode decoding errors
------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.160.138.1\x1a104.160.138.1\x1c104.160.128.20\***@8\xea\x8e\x06\x8e
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884', checksum=None,
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
Traceback (most recent call last):
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 445, in read
return self.read_data(self.writers_schema, self.readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 490, in read_data
return self.read_record(writers_schema, readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in position 15:
invalid start byte
Current pnda.io, use some avro plugin when integrating with logstash, in
our case we would like to use pmacct instead. We believe, in the case of
pmacct something else may required in order to push data correctly to
pnda.io kafka.
https://github.com/pndaproject/pnda-guide/blob/develop/producer/logstash.md
On thing I notice that pnda.io and logstash define as part of the
avro/kafka plugin is
value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer
However, i was not able to find out if this is something I can set up with
pmacct/kafka integration.
I'm attaching some relevant configuration.
***@ip-10-180-221-190:~/pmacct$ more netflow_kafka.conf
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos, src_as,
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net, dst_net,
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092
***@ip-10-180-221-190:~/pmacct$ more pnda.avsc
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
***@ip-10-180-221-190:~/pmacct$
thanks for any light here
--
*Jaime Botello** (aka Jimbo) *// *Riot Games*
*Jaime Botello** (aka Jimbo) *// *Riot Games*