Discussion:
[pmacct-discussion] pmacct and pnda.io integration
Jaime Botello
2018-03-19 07:44:57 UTC
Permalink
Hi,

We are working on getting pmacct as a netflow producer for pnda.io data
pipeline and we are wondering if anyone else had tried to integrate pmacct
with pnda.io.

pnda.io use kafka and avro to get data into the pipeline.

After getting all components required on pmacct we believe there's
something still missing.

I'm using the following code just to test and consume the data that is in
the pipeline

https://github.com/pndaproject/example-kafka-clients/blob/develop/python/consumer.py

In theory, when running the above code, we should get an output like the
following
https://github.com/pndaproject/pnda-guide/blob/develop/producer/data-preparation.md#consumer-example

Instead, the script fails with unicode decoding errors

------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.160.138.1\x1a104.160.138.1\x1c104.160.128.20\***@8\xea\x8e\x06\x8e
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884', checksum=None,
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
Traceback (most recent call last):
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 445, in read
return self.read_data(self.writers_schema, self.readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 490, in read_data
return self.read_record(writers_schema, readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in position 15:
invalid start byte

Current pnda.io, use some avro plugin when integrating with logstash, in
our case we would like to use pmacct instead. We believe, in the case of
pmacct something else may required in order to push data correctly to
pnda.io kafka.

https://github.com/pndaproject/pnda-guide/blob/develop/producer/logstash.md

On thing I notice that pnda.io and logstash define as part of the
avro/kafka plugin is

value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer


However, i was not able to find out if this is something I can set up with
pmacct/kafka integration.

I'm attaching some relevant configuration.

***@ip-10-180-221-190:~/pmacct$ more netflow_kafka.conf
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos, src_as,
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net, dst_net,
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092

***@ip-10-180-221-190:~/pmacct$ more pnda.avsc
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
***@ip-10-180-221-190:~/pmacct$

thanks for any light here
--
*Jaime Botello** (aka Jimbo) *// *Riot Games*
Paolo Lucente
2018-03-20 23:22:54 UTC
Permalink
Hi Jaime,

While i can't help you with this, i'm trying to see whether this can be
further investigated with the PNDA team (as a result of that i seem to
understand you were also in touch with them too). Keep you posted in
case of any news.

Paolo
Post by Jaime Botello
Hi,
We are working on getting pmacct as a netflow producer for pnda.io data
pipeline and we are wondering if anyone else had tried to integrate pmacct
with pnda.io.
pnda.io use kafka and avro to get data into the pipeline.
After getting all components required on pmacct we believe there's
something still missing.
I'm using the following code just to test and consume the data that is in
the pipeline
https://github.com/pndaproject/example-kafka-clients/blob/develop/python/consumer.py
In theory, when running the above code, we should get an output like the
following
https://github.com/pndaproject/pnda-guide/blob/develop/producer/data-preparation.md#consumer-example
Instead, the script fails with unicode decoding errors
------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884', checksum=None,
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 445, in read
return self.read_data(self.writers_schema, self.readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 490, in read_data
return self.read_record(writers_schema, readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
invalid start byte
Current pnda.io, use some avro plugin when integrating with logstash, in
our case we would like to use pmacct instead. We believe, in the case of
pmacct something else may required in order to push data correctly to
pnda.io kafka.
https://github.com/pndaproject/pnda-guide/blob/develop/producer/logstash.md
On thing I notice that pnda.io and logstash define as part of the
avro/kafka plugin is
value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer
However, i was not able to find out if this is something I can set up with
pmacct/kafka integration.
I'm attaching some relevant configuration.
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos, src_as,
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net, dst_net,
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
thanks for any light here
--
*Jaime Botello** (aka Jimbo) *// *Riot Games*
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
Jaime Botello
2018-03-21 01:00:17 UTC
Permalink
thank you Paolo, I will update the group as well if we have an update.
Post by Paolo Lucente
Hi Jaime,
While i can't help you with this, i'm trying to see whether this can be
further investigated with the PNDA team (as a result of that i seem to
understand you were also in touch with them too). Keep you posted in
case of any news.
Paolo
Post by Jaime Botello
Hi,
We are working on getting pmacct as a netflow producer for pnda.io data
pipeline and we are wondering if anyone else had tried to integrate
pmacct
Post by Jaime Botello
with pnda.io.
pnda.io use kafka and avro to get data into the pipeline.
After getting all components required on pmacct we believe there's
something still missing.
I'm using the following code just to test and consume the data that is in
the pipeline
https://github.com/pndaproject/example-kafka-
clients/blob/develop/python/consumer.py
Post by Jaime Botello
In theory, when running the above code, we should get an output like the
following
https://github.com/pndaproject/pnda-guide/blob/develop/producer/data-
preparation.md#consumer-example
Post by Jaime Botello
Instead, the script fails with unicode decoding errors
------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.
\xea\x8e\x06\x8e
Post by Jaime Botello
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884', checksum=None,
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/
python2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 445, in read
return self.read_data(self.writers_schema, self.readers_schema,
decoder)
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/
python2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 490, in read_data
return self.read_record(writers_schema, readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/
python2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/
python2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/
python2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
invalid start byte
Current pnda.io, use some avro plugin when integrating with logstash, in
our case we would like to use pmacct instead. We believe, in the case
of
Post by Jaime Botello
pmacct something else may required in order to push data correctly to
pnda.io kafka.
https://github.com/pndaproject/pnda-guide/blob/
develop/producer/logstash.md
Post by Jaime Botello
On thing I notice that pnda.io and logstash define as part of the
avro/kafka plugin is
value_serializer => 'org.apache.kafka.common.serialization.
ByteArraySerializer
Post by Jaime Botello
However, i was not able to find out if this is something I can set up
with
Post by Jaime Botello
pmacct/kafka integration.
I'm attaching some relevant configuration.
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos, src_as,
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net, dst_net,
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
thanks for any light here
--
*Jaime Botello** (aka Jimbo) *// *Riot Games*
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
*Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
310-210-9772 // summoner: Riot R3lick
Tim Raphael
2018-03-21 01:06:29 UTC
Permalink
I’ve also been keeping a keen eye on the Pnda.io guide page (still blank) for Pmacct
 who is actually working on this one @ Jaime?

- Tim
Post by Jaime Botello
thank you Paolo, I will update the group as well if we have an update.
Hi Jaime,
While i can't help you with this, i'm trying to see whether this can be
further investigated with the PNDA team (as a result of that i seem to
understand you were also in touch with them too). Keep you posted in
case of any news.
Paolo
Hi,
We are working on getting pmacct as a netflow producer for pnda.io <http://pnda.io/> data
pipeline and we are wondering if anyone else had tried to integrate pmacct
with pnda.io <http://pnda.io/>.
pnda.io <http://pnda.io/> use kafka and avro to get data into the pipeline.
After getting all components required on pmacct we believe there's
something still missing.
I'm using the following code just to test and consume the data that is in
the pipeline
https://github.com/pndaproject/example-kafka-clients/blob/develop/python/consumer.py <https://github.com/pndaproject/example-kafka-clients/blob/develop/python/consumer.py>
In theory, when running the above code, we should get an output like the
following
https://github.com/pndaproject/pnda-guide/blob/develop/producer/data-preparation.md#consumer-example <https://github.com/pndaproject/pnda-guide/blob/develop/producer/data-preparation.md#consumer-example>
Instead, the script fails with unicode decoding errors
------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884', checksum=None,
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 445, in read
return self.read_data(self.writers_schema, self.readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 490, in read_data
return self.read_record(writers_schema, readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
invalid start byte
Current pnda.io <http://pnda.io/>, use some avro plugin when integrating with logstash, in
our case we would like to use pmacct instead. We believe, in the case of
pmacct something else may required in order to push data correctly to
pnda.io <http://pnda.io/> kafka.
https://github.com/pndaproject/pnda-guide/blob/develop/producer/logstash.md <https://github.com/pndaproject/pnda-guide/blob/develop/producer/logstash.md>
On thing I notice that pnda.io <http://pnda.io/> and logstash define as part of the
avro/kafka plugin is
value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer
However, i was not able to find out if this is something I can set up with
pmacct/kafka integration.
I'm attaching some relevant configuration.
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos, src_as,
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net, dst_net,
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
thanks for any light here
--
*Jaime Botello** (aka Jimbo) *// *Riot Games*
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists <http://www.pmacct.net/#mailinglists>
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists <http://www.pmacct.net/#mailinglists>
--
Jaime Botello (aka Jimbo) // Riot Games : Riot Direct // c: 310-210-9772 <tel:310-210-9772> // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
Jaime Botello
2018-03-22 02:56:42 UTC
Permalink
Hey Paolo,

I was thinking about this after reading a little bit more on how data is
deserialized by pnda.io

For example, if I download the file from pnda hdfs and read it using avro
tools, you can see pnda(kafka) was not able to deserialize the data.
Since Pnda use byte array deserialization, and by reading their logstash
integration notes, they clearly mention they use byte array serialization,
don't you think we could fix this by just adding byte array serialization
into pmacct kafka plugin?

Let me know if this make sense.

thanks

***@ip-10-180-221-47:~/datasets$ java -jar avro-tools-1.8.2.jar tojson
f0f01acf-5011-42ec-90b0-c18f21e4e2ab.avro
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
{"topic":"netflow","timestamp":1521676850049,"reason":{"string":"*Unable to
deserialize data*"},"payload":"{\"event_type\": \"purge\", \"as_src\":
6507, \"as_dst\": 5739, \"peer_ip_src\": \"x.x.x.x\", \"peer_ip_dst\":
\"\", \"iface_in\": 654, \"iface_out\": 659, \"ip_src\": \"x.x.x.x\",
\"net_src\": \"x.x.x.x\", \"ip_dst\": \"x.x.x.x\", \"net_dst\":
\"x.x.x.x\", \"mask_src\": 32, \"mask_dst\": 16, \"port_src\": 2099,
\"port_dst\": 55764, \"tcp_flags\": \"24\", \"ip_proto\": \"tcp\", \"tos\":
0, \"sampling_rate\": 1000, \"timestamp_start\": \"2018-03-16
22:17:29.856000\", \"timestamp_end\": \"2018-03-16 22:17:29.856000\",
\"timestamp_arrival\": \"2018-03-16 22:18:30.893573\", \"stamp_inserted\":
\"2018-03-16 22:17:00\", \"stamp_updated\": \"2018-03-16 22:19:01\",
\"packets\": 1, \"bytes\": 667, \"writer_id\": \"default_kafka/2971\"}"}
Post by Tim Raphael
I’ve also been keeping a keen eye on the Pnda.io guide page (still blank)
- Tim
thank you Paolo, I will update the group as well if we have an update.
Post by Paolo Lucente
Hi Jaime,
While i can't help you with this, i'm trying to see whether this can be
further investigated with the PNDA team (as a result of that i seem to
understand you were also in touch with them too). Keep you posted in
case of any news.
Paolo
Post by Jaime Botello
Hi,
We are working on getting pmacct as a netflow producer for pnda.io data
pipeline and we are wondering if anyone else had tried to integrate
pmacct
Post by Jaime Botello
with pnda.io.
pnda.io use kafka and avro to get data into the pipeline.
After getting all components required on pmacct we believe there's
something still missing.
I'm using the following code just to test and consume the data that is
in
Post by Jaime Botello
the pipeline
https://github.com/pndaproject/example-kafka-clients/blob/
develop/python/consumer.py
Post by Jaime Botello
In theory, when running the above code, we should get an output like the
following
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/data-preparation.md#consumer-example
Post by Jaime Botello
Instead, the script fails with unicode decoding errors
------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.16
\xea\x8e\x06\x8e
Post by Jaime Botello
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884', checksum=None,
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 445, in read
return self.read_data(self.writers_schema, self.readers_schema,
decoder)
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 490, in read_data
return self.read_record(writers_schema, readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
invalid start byte
Current pnda.io, use some avro plugin when integrating with logstash,
in
Post by Jaime Botello
our case we would like to use pmacct instead. We believe, in the case
of
Post by Jaime Botello
pmacct something else may required in order to push data correctly to
pnda.io kafka.
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/logstash.md
Post by Jaime Botello
On thing I notice that pnda.io and logstash define as part of the
avro/kafka plugin is
value_serializer => 'org.apache.kafka.common.seria
lization.ByteArraySerializer
Post by Jaime Botello
However, i was not able to find out if this is something I can set up
with
Post by Jaime Botello
pmacct/kafka integration.
I'm attaching some relevant configuration.
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos, src_as,
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net, dst_net,
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
thanks for any light here
--
*Jaime Botello** (aka Jimbo) *// *Riot Games*
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
*Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
310-210-9772 // summoner: Riot R3lick
Paolo Lucente
2018-03-24 20:28:06 UTC
Permalink
Hey Jaime,

What you say does make sense to me and would be up to this dev. Can i
ask you if it would be a possibility to access your deployment (since i
do not have the PNDA framework deployed anywhere)? It would make easier
development and subsequent testing. If yes, we can follow up privately.

Paolo
Post by Jaime Botello
Hey Paolo,
I was thinking about this after reading a little bit more on how data is
deserialized by pnda.io
For example, if I download the file from pnda hdfs and read it using avro
tools, you can see pnda(kafka) was not able to deserialize the data.
Since Pnda use byte array deserialization, and by reading their logstash
integration notes, they clearly mention they use byte array serialization,
don't you think we could fix this by just adding byte array serialization
into pmacct kafka plugin?
Let me know if this make sense.
thanks
f0f01acf-5011-42ec-90b0-c18f21e4e2ab.avro
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
{"topic":"netflow","timestamp":1521676850049,"reason":{"string":"*Unable to
\"\", \"iface_in\": 654, \"iface_out\": 659, \"ip_src\": \"x.x.x.x\",
\"x.x.x.x\", \"mask_src\": 32, \"mask_dst\": 16, \"port_src\": 2099,
0, \"sampling_rate\": 1000, \"timestamp_start\": \"2018-03-16
22:17:29.856000\", \"timestamp_end\": \"2018-03-16 22:17:29.856000\",
\"2018-03-16 22:17:00\", \"stamp_updated\": \"2018-03-16 22:19:01\",
\"packets\": 1, \"bytes\": 667, \"writer_id\": \"default_kafka/2971\"}"}
I’ve also been keeping a keen eye on the Pnda.io guide page (still blank)
- Tim
thank you Paolo, I will update the group as well if we have an update.
Post by Paolo Lucente
Hi Jaime,
While i can't help you with this, i'm trying to see whether this can be
further investigated with the PNDA team (as a result of that i seem to
understand you were also in touch with them too). Keep you posted in
case of any news.
Paolo
Post by Jaime Botello
Hi,
We are working on getting pmacct as a netflow producer for pnda.io data
pipeline and we are wondering if anyone else had tried to integrate
pmacct
Post by Jaime Botello
with pnda.io.
pnda.io use kafka and avro to get data into the pipeline.
After getting all components required on pmacct we believe there's
something still missing.
I'm using the following code just to test and consume the data that is
in
Post by Jaime Botello
the pipeline
https://github.com/pndaproject/example-kafka-clients/blob/
develop/python/consumer.py
Post by Jaime Botello
In theory, when running the above code, we should get an output like the
following
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/data-preparation.md#consumer-example
Post by Jaime Botello
Instead, the script fails with unicode decoding errors
------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.16
\xea\x8e\x06\x8e
Post by Jaime Botello
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884', checksum=None,
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 445, in read
return self.read_data(self.writers_schema, self.readers_schema,
decoder)
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 490, in read_data
return self.read_record(writers_schema, readers_schema, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type, decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
invalid start byte
Current pnda.io, use some avro plugin when integrating with logstash,
in
Post by Jaime Botello
our case we would like to use pmacct instead. We believe, in the case
of
Post by Jaime Botello
pmacct something else may required in order to push data correctly to
pnda.io kafka.
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/logstash.md
Post by Jaime Botello
On thing I notice that pnda.io and logstash define as part of the
avro/kafka plugin is
value_serializer => 'org.apache.kafka.common.seria
lization.ByteArraySerializer
Post by Jaime Botello
However, i was not able to find out if this is something I can set up
with
Post by Jaime Botello
pmacct/kafka integration.
I'm attaching some relevant configuration.
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos, src_as,
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net, dst_net,
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
thanks for any light here
--
*Jaime Botello** (aka Jimbo) *// *Riot Games*
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
Jaime Botello
2018-03-26 13:05:00 UTC
Permalink
Hi Paolo,

Outside access to the environment is going to be difficult since is part of
our production environment, however, we may be able to arrange some remote
sessions if that's something that may work.

Having said that, we were able to find a workaround that works as follows:

Since pmacct can't serialize the data to something that pnda.io would
understand, we set up a logstash instance that serves as message
translation between pmacct and pnda.io.



Logstash is configured to use pnda-avro codec plugin that supports
ArrayByteSerialization. This seems to be working for now and it will
provide us with some time to figure out how we can integrate
pmacct directly with pnda.io so we can increase the overall throughput of
the system and maximize the efficiency.

If there's any interest in the details, I can share some of the
documentation we are working on right now.

thank you

--Jaime
Post by Paolo Lucente
Hey Jaime,
What you say does make sense to me and would be up to this dev. Can i
ask you if it would be a possibility to access your deployment (since i
do not have the PNDA framework deployed anywhere)? It would make easier
development and subsequent testing. If yes, we can follow up privately.
Paolo
Post by Jaime Botello
Hey Paolo,
I was thinking about this after reading a little bit more on how data is
deserialized by pnda.io
For example, if I download the file from pnda hdfs and read it using avro
tools, you can see pnda(kafka) was not able to deserialize the data.
Since Pnda use byte array deserialization, and by reading their logstash
integration notes, they clearly mention they use byte array
serialization,
Post by Jaime Botello
don't you think we could fix this by just adding byte array serialization
into pmacct kafka plugin?
Let me know if this make sense.
thanks
tojson
Post by Jaime Botello
f0f01acf-5011-42ec-90b0-c18f21e4e2ab.avro
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
{"topic":"netflow","timestamp":1521676850049,"reason":{"string":"*Unable
to
Post by Jaime Botello
\"\", \"iface_in\": 654, \"iface_out\": 659, \"ip_src\": \"x.x.x.x\",
\"x.x.x.x\", \"mask_src\": 32, \"mask_dst\": 16, \"port_src\": 2099,
\"port_dst\": 55764, \"tcp_flags\": \"24\", \"ip_proto\": \"tcp\",
0, \"sampling_rate\": 1000, \"timestamp_start\": \"2018-03-16
22:17:29.856000\", \"timestamp_end\": \"2018-03-16 22:17:29.856000\",
\"timestamp_arrival\": \"2018-03-16 22:18:30.893573\",
\"2018-03-16 22:17:00\", \"stamp_updated\": \"2018-03-16 22:19:01\",
\"packets\": 1, \"bytes\": 667, \"writer_id\": \"default_kafka/2971\"}"}
Post by Tim Raphael
I’ve also been keeping a keen eye on the Pnda.io guide page (still
blank)
Post by Jaime Botello
Post by Tim Raphael
- Tim
thank you Paolo, I will update the group as well if we have an update.
Post by Paolo Lucente
Hi Jaime,
While i can't help you with this, i'm trying to see whether this can
be
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
further investigated with the PNDA team (as a result of that i seem to
understand you were also in touch with them too). Keep you posted in
case of any news.
Paolo
Post by Jaime Botello
Hi,
We are working on getting pmacct as a netflow producer for pnda.io
data
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
pipeline and we are wondering if anyone else had tried to integrate
pmacct
Post by Jaime Botello
with pnda.io.
pnda.io use kafka and avro to get data into the pipeline.
After getting all components required on pmacct we believe there's
something still missing.
I'm using the following code just to test and consume the data that
is
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
in
Post by Jaime Botello
the pipeline
https://github.com/pndaproject/example-kafka-clients/blob/
develop/python/consumer.py
Post by Jaime Botello
In theory, when running the above code, we should get an output
like the
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
following
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/data-preparation.md#consumer-example
Post by Jaime Botello
Instead, the script fails with unicode decoding errors
------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.16
\xea\x8e\x06\x8e
Post by Jaime Botello
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884',
checksum=None,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 445, in read
return self.read_data(self.writers_schema, self.readers_schema,
decoder)
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 490, in read_data
return self.read_record(writers_schema, readers_schema,
decoder)
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type,
decoder)
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in position
invalid start byte
Current pnda.io, use some avro plugin when integrating with
logstash,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
in
Post by Jaime Botello
our case we would like to use pmacct instead. We believe, in the
case
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
of
Post by Jaime Botello
pmacct something else may required in order to push data correctly
to
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
pnda.io kafka.
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/logstash.md
Post by Jaime Botello
On thing I notice that pnda.io and logstash define as part of the
avro/kafka plugin is
value_serializer => 'org.apache.kafka.common.seria
lization.ByteArraySerializer
Post by Jaime Botello
However, i was not able to find out if this is something I can set
up
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
with
Post by Jaime Botello
pmacct/kafka integration.
I'm attaching some relevant configuration.
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos,
src_as,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net,
dst_net,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
thanks for any light here
--
*Jaime Botello** (aka Jimbo) *// *Riot Games*
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
--
*Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
310-210-9772 // summoner: Riot R3lick
Anthony Caiafa
2018-03-26 13:27:52 UTC
Permalink
Just adding 2 cents here. It seems like quite a few steps and going back
and forth to kafka. You should look at Apache NIFI which will take the data
from pmacct and it will cut down on all of those steps.
Post by Jaime Botello
Hi Paolo,
Outside access to the environment is going to be difficult since is part
of our production environment, however, we may be able to arrange some
remote sessions if that's something that may work.
Since pmacct can't serialize the data to something that pnda.io would
understand, we set up a logstash instance that serves as message
translation between pmacct and pnda.io.
Logstash is configured to use pnda-avro codec plugin that supports
ArrayByteSerialization. This seems to be working for now and it will
provide us with some time to figure out how we can integrate
pmacct directly with pnda.io so we can increase the overall throughput of
the system and maximize the efficiency.
If there's any interest in the details, I can share some of the
documentation we are working on right now.
thank you
--Jaime
Post by Paolo Lucente
Hey Jaime,
What you say does make sense to me and would be up to this dev. Can i
ask you if it would be a possibility to access your deployment (since i
do not have the PNDA framework deployed anywhere)? It would make easier
development and subsequent testing. If yes, we can follow up privately.
Paolo
Post by Jaime Botello
Hey Paolo,
I was thinking about this after reading a little bit more on how data is
deserialized by pnda.io
For example, if I download the file from pnda hdfs and read it using
avro
Post by Jaime Botello
tools, you can see pnda(kafka) was not able to deserialize the data.
Since Pnda use byte array deserialization, and by reading their
logstash
Post by Jaime Botello
integration notes, they clearly mention they use byte array
serialization,
Post by Jaime Botello
don't you think we could fix this by just adding byte array
serialization
Post by Jaime Botello
into pmacct kafka plugin?
Let me know if this make sense.
thanks
tojson
Post by Jaime Botello
f0f01acf-5011-42ec-90b0-c18f21e4e2ab.avro
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
for
Post by Jaime Botello
more info.
{"topic":"netflow","timestamp":1521676850049,"reason":{"string":"*Unable to
Post by Jaime Botello
\"\", \"iface_in\": 654, \"iface_out\": 659, \"ip_src\": \"x.x.x.x\",
\"x.x.x.x\", \"mask_src\": 32, \"mask_dst\": 16, \"port_src\": 2099,
\"port_dst\": 55764, \"tcp_flags\": \"24\", \"ip_proto\": \"tcp\",
0, \"sampling_rate\": 1000, \"timestamp_start\": \"2018-03-16
22:17:29.856000\", \"timestamp_end\": \"2018-03-16 22:17:29.856000\",
\"timestamp_arrival\": \"2018-03-16 22:18:30.893573\",
\"2018-03-16 22:17:00\", \"stamp_updated\": \"2018-03-16 22:19:01\",
\"packets\": 1, \"bytes\": 667, \"writer_id\": \"default_kafka/2971\"}"}
Post by Tim Raphael
I’ve also been keeping a keen eye on the Pnda.io guide page (still
blank)
Post by Jaime Botello
Post by Tim Raphael
- Tim
thank you Paolo, I will update the group as well if we have an update.
Post by Paolo Lucente
Hi Jaime,
While i can't help you with this, i'm trying to see whether this can
be
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
further investigated with the PNDA team (as a result of that i seem
to
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
understand you were also in touch with them too). Keep you posted in
case of any news.
Paolo
Post by Jaime Botello
Hi,
We are working on getting pmacct as a netflow producer for pnda.io
data
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
pipeline and we are wondering if anyone else had tried to integrate
pmacct
Post by Jaime Botello
with pnda.io.
pnda.io use kafka and avro to get data into the pipeline.
After getting all components required on pmacct we believe there's
something still missing.
I'm using the following code just to test and consume the data
that is
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
in
Post by Jaime Botello
the pipeline
https://github.com/pndaproject/example-kafka-clients/blob/
develop/python/consumer.py
Post by Jaime Botello
In theory, when running the above code, we should get an output
like the
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
following
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/data-preparation.md#consumer-example
Post by Jaime Botello
Instead, the script fails with unicode decoding errors
------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.16
\xea\x8e\x06\x8e
Post by Jaime Botello
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884',
checksum=None,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 445, in read
return self.read_data(self.writers_schema, self.readers_schema,
decoder)
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 490, in read_data
return self.read_record(writers_schema, readers_schema,
decoder)
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type,
decoder)
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in
invalid start byte
Current pnda.io, use some avro plugin when integrating with
logstash,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
in
Post by Jaime Botello
our case we would like to use pmacct instead. We believe, in the
case
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
of
Post by Jaime Botello
pmacct something else may required in order to push data correctly
to
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
pnda.io kafka.
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/logstash.md
Post by Jaime Botello
On thing I notice that pnda.io and logstash define as part of the
avro/kafka plugin is
value_serializer => 'org.apache.kafka.common.seria
lization.ByteArraySerializer
Post by Jaime Botello
However, i was not able to find out if this is something I can set
up
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
with
Post by Jaime Botello
pmacct/kafka integration.
I'm attaching some relevant configuration.
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos,
src_as,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net,
dst_net,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
thanks for any light here
--
*Jaime Botello** (aka Jimbo) *// *Riot Games*
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
--
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
Jaime Botello
2018-03-26 13:44:54 UTC
Permalink
Hi Anthony,

Not familiar with it, thank you. I will bring this to our team working on
figuring out how to remove logstash from the workflow.

--Jaime
Post by Anthony Caiafa
Just adding 2 cents here. It seems like quite a few steps and going back
and forth to kafka. You should look at Apache NIFI which will take the data
from pmacct and it will cut down on all of those steps.
Post by Jaime Botello
Hi Paolo,
Outside access to the environment is going to be difficult since is part
of our production environment, however, we may be able to arrange some
remote sessions if that's something that may work.
Since pmacct can't serialize the data to something that pnda.io would
understand, we set up a logstash instance that serves as message
translation between pmacct and pnda.io.
Logstash is configured to use pnda-avro codec plugin that supports
ArrayByteSerialization. This seems to be working for now and it will
provide us with some time to figure out how we can integrate
pmacct directly with pnda.io so we can increase the overall throughput
of the system and maximize the efficiency.
If there's any interest in the details, I can share some of the
documentation we are working on right now.
thank you
--Jaime
Post by Paolo Lucente
Hey Jaime,
What you say does make sense to me and would be up to this dev. Can i
ask you if it would be a possibility to access your deployment (since i
do not have the PNDA framework deployed anywhere)? It would make easier
development and subsequent testing. If yes, we can follow up privately.
Paolo
Post by Jaime Botello
Hey Paolo,
I was thinking about this after reading a little bit more on how data
is
Post by Jaime Botello
deserialized by pnda.io
For example, if I download the file from pnda hdfs and read it using
avro
Post by Jaime Botello
tools, you can see pnda(kafka) was not able to deserialize the data.
Since Pnda use byte array deserialization, and by reading their
logstash
Post by Jaime Botello
integration notes, they clearly mention they use byte array
serialization,
Post by Jaime Botello
don't you think we could fix this by just adding byte array
serialization
Post by Jaime Botello
into pmacct kafka plugin?
Let me know if this make sense.
thanks
tojson
Post by Jaime Botello
f0f01acf-5011-42ec-90b0-c18f21e4e2ab.avro
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
for
Post by Jaime Botello
more info.
{"topic":"netflow","timestamp":1521676850049,"reason":{"string":"*Unable
to
Post by Jaime Botello
\"\", \"iface_in\": 654, \"iface_out\": 659, \"ip_src\": \"x.x.x.x\",
\"x.x.x.x\", \"mask_src\": 32, \"mask_dst\": 16, \"port_src\": 2099,
\"port_dst\": 55764, \"tcp_flags\": \"24\", \"ip_proto\": \"tcp\",
0, \"sampling_rate\": 1000, \"timestamp_start\": \"2018-03-16
22:17:29.856000\", \"timestamp_end\": \"2018-03-16 22:17:29.856000\",
\"timestamp_arrival\": \"2018-03-16 22:18:30.893573\",
\"2018-03-16 22:17:00\", \"stamp_updated\": \"2018-03-16 22:19:01\",
\"default_kafka/2971\"}"}
Post by Jaime Botello
On Tue, Mar 20, 2018 at 6:06 PM, Tim Raphael <
Post by Tim Raphael
I’ve also been keeping a keen eye on the Pnda.io guide page (still
blank)
Post by Jaime Botello
Post by Tim Raphael
- Tim
thank you Paolo, I will update the group as well if we have an
update.
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Hi Jaime,
While i can't help you with this, i'm trying to see whether this
can be
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
further investigated with the PNDA team (as a result of that i seem
to
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
understand you were also in touch with them too). Keep you posted in
case of any news.
Paolo
Post by Jaime Botello
Hi,
We are working on getting pmacct as a netflow producer for
pnda.io data
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
pipeline and we are wondering if anyone else had tried to
integrate
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
pmacct
Post by Jaime Botello
with pnda.io.
pnda.io use kafka and avro to get data into the pipeline.
After getting all components required on pmacct we believe there's
something still missing.
I'm using the following code just to test and consume the data
that is
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
in
Post by Jaime Botello
the pipeline
https://github.com/pndaproject/example-kafka-clients/blob/
develop/python/consumer.py
Post by Jaime Botello
In theory, when running the above code, we should get an output
like the
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
following
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/data-preparation.md#consumer-example
Post by Jaime Botello
Instead, the script fails with unicode decoding errors
------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.16
\xea\x8e\x06\x8e
Post by Jaime Botello
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884',
checksum=None,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 445, in read
return self.read_data(self.writers_schema,
self.readers_schema,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
decoder)
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 490, in read_data
return self.read_record(writers_schema, readers_schema,
decoder)
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type,
decoder)
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in
invalid start byte
Current pnda.io, use some avro plugin when integrating with
logstash,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
in
Post by Jaime Botello
our case we would like to use pmacct instead. We believe, in
the case
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
of
Post by Jaime Botello
pmacct something else may required in order to push data
correctly to
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
pnda.io kafka.
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/logstash.md
Post by Jaime Botello
On thing I notice that pnda.io and logstash define as part of the
avro/kafka plugin is
value_serializer => 'org.apache.kafka.common.seria
lization.ByteArraySerializer
Post by Jaime Botello
However, i was not able to find out if this is something I can
set up
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
with
Post by Jaime Botello
pmacct/kafka integration.
I'm attaching some relevant configuration.
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos,
src_as,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net,
dst_net,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end,
timestamp_arrival
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
thanks for any light here
--
*Jaime Botello** (aka Jimbo) *// *Riot Games*
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
--
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
*Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
310-210-9772 // summoner: Riot R3lick
Anthony Caiafa
2018-03-26 14:02:24 UTC
Permalink
Its a really simple and solid opensource product. Absolutely worth the
efforts since it is extremely powerful.
Post by Jaime Botello
Hi Anthony,
Not familiar with it, thank you. I will bring this to our team working on
figuring out how to remove logstash from the workflow.
--Jaime
Post by Anthony Caiafa
Just adding 2 cents here. It seems like quite a few steps and going back
and forth to kafka. You should look at Apache NIFI which will take the data
from pmacct and it will cut down on all of those steps.
Post by Jaime Botello
Hi Paolo,
Outside access to the environment is going to be difficult since is part
of our production environment, however, we may be able to arrange some
remote sessions if that's something that may work.
Since pmacct can't serialize the data to something that pnda.io would
understand, we set up a logstash instance that serves as message
translation between pmacct and pnda.io.
Logstash is configured to use pnda-avro codec plugin that supports
ArrayByteSerialization. This seems to be working for now and it will
provide us with some time to figure out how we can integrate
pmacct directly with pnda.io so we can increase the overall throughput
of the system and maximize the efficiency.
If there's any interest in the details, I can share some of the
documentation we are working on right now.
thank you
--Jaime
Post by Paolo Lucente
Hey Jaime,
What you say does make sense to me and would be up to this dev. Can i
ask you if it would be a possibility to access your deployment (since i
do not have the PNDA framework deployed anywhere)? It would make easier
development and subsequent testing. If yes, we can follow up privately.
Paolo
Post by Jaime Botello
Hey Paolo,
I was thinking about this after reading a little bit more on how data
is
Post by Jaime Botello
deserialized by pnda.io
For example, if I download the file from pnda hdfs and read it using
avro
Post by Jaime Botello
tools, you can see pnda(kafka) was not able to deserialize the data.
Since Pnda use byte array deserialization, and by reading their
logstash
Post by Jaime Botello
integration notes, they clearly mention they use byte array
serialization,
Post by Jaime Botello
don't you think we could fix this by just adding byte array
serialization
Post by Jaime Botello
into pmacct kafka plugin?
Let me know if this make sense.
thanks
tojson
Post by Jaime Botello
f0f01acf-5011-42ec-90b0-c18f21e4e2ab.avro
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
for
Post by Jaime Botello
more info.
{"topic":"netflow","timestamp":1521676850049,"reason":{"string":"*Unable
to
Post by Jaime Botello
deserialize data*"},"payload":"{\"event_type\": \"purge\",
\"\", \"iface_in\": 654, \"iface_out\": 659, \"ip_src\": \"x.x.x.x\",
\"x.x.x.x\", \"mask_src\": 32, \"mask_dst\": 16, \"port_src\": 2099,
\"port_dst\": 55764, \"tcp_flags\": \"24\", \"ip_proto\": \"tcp\",
0, \"sampling_rate\": 1000, \"timestamp_start\": \"2018-03-16
22:17:29.856000\", \"timestamp_end\": \"2018-03-16 22:17:29.856000\",
\"timestamp_arrival\": \"2018-03-16 22:18:30.893573\",
\"2018-03-16 22:17:00\", \"stamp_updated\": \"2018-03-16 22:19:01\",
\"default_kafka/2971\"}"}
Post by Jaime Botello
On Tue, Mar 20, 2018 at 6:06 PM, Tim Raphael <
Post by Tim Raphael
I’ve also been keeping a keen eye on the Pnda.io guide page (still
blank)
Post by Jaime Botello
Post by Tim Raphael
- Tim
thank you Paolo, I will update the group as well if we have an
update.
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Hi Jaime,
While i can't help you with this, i'm trying to see whether this
can be
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
further investigated with the PNDA team (as a result of that i
seem to
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
understand you were also in touch with them too). Keep you posted
in
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
case of any news.
Paolo
Post by Jaime Botello
Hi,
We are working on getting pmacct as a netflow producer for
pnda.io data
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
pipeline and we are wondering if anyone else had tried to
integrate
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
pmacct
Post by Jaime Botello
with pnda.io.
pnda.io use kafka and avro to get data into the pipeline.
After getting all components required on pmacct we believe
there's
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
something still missing.
I'm using the following code just to test and consume the data
that is
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
in
Post by Jaime Botello
the pipeline
https://github.com/pndaproject/example-kafka-clients/blob/
develop/python/consumer.py
Post by Jaime Botello
In theory, when running the above code, we should get an output
like the
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
following
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/data-preparation.md#consumer-example
Post by Jaime Botello
Instead, the script fails with unicode decoding errors
------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.16
\xea\x8e\x06\x8e
Post by Jaime Botello
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884',
checksum=None,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 445, in read
return self.read_data(self.writers_schema,
self.readers_schema,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
decoder)
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 490, in read_data
return self.read_record(writers_schema, readers_schema,
decoder)
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type,
decoder)
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in
invalid start byte
Current pnda.io, use some avro plugin when integrating with
logstash,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
in
Post by Jaime Botello
our case we would like to use pmacct instead. We believe, in
the case
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
of
Post by Jaime Botello
pmacct something else may required in order to push data
correctly to
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
pnda.io kafka.
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/logstash.md
Post by Jaime Botello
On thing I notice that pnda.io and logstash define as part of
the
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
avro/kafka plugin is
value_serializer => 'org.apache.kafka.common.seria
lization.ByteArraySerializer
Post by Jaime Botello
However, i was not able to find out if this is something I can
set up
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
with
Post by Jaime Botello
pmacct/kafka integration.
I'm attaching some relevant configuration.
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos,
src_as,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net,
dst_net,
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end,
timestamp_arrival
Post by Jaime Botello
Post by Tim Raphael
Post by Paolo Lucente
Post by Jaime Botello
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
thanks for any light here
--
*Jaime Botello** (aka Jimbo) *// *Riot Games*
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
--
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
Paolo Lucente
2018-03-27 15:04:01 UTC
Permalink
Hi Jaime,

Please, yes, it would be great if you could share some documentation at
this propo. As i'm wrapping up documentation for relasing 1.7.1, this
would be very timely. We could insert it in the shape of a FAQ or as a
section in the QUICKSTART guide. Look forward to it.

It would also help me frame the issue better. And we could refine it
later with the input from Anthony.

Paolo
Post by Jaime Botello
Hi Paolo,
Outside access to the environment is going to be difficult since is part of
our production environment, however, we may be able to arrange some remote
sessions if that's something that may work.
Since pmacct can't serialize the data to something that pnda.io would
understand, we set up a logstash instance that serves as message
translation between pmacct and pnda.io.
Logstash is configured to use pnda-avro codec plugin that supports
ArrayByteSerialization. This seems to be working for now and it will
provide us with some time to figure out how we can integrate
pmacct directly with pnda.io so we can increase the overall throughput of
the system and maximize the efficiency.
If there's any interest in the details, I can share some of the
documentation we are working on right now.
thank you
--Jaime
Post by Paolo Lucente
Hey Jaime,
What you say does make sense to me and would be up to this dev. Can i
ask you if it would be a possibility to access your deployment (since i
do not have the PNDA framework deployed anywhere)? It would make easier
development and subsequent testing. If yes, we can follow up privately.
Paolo
Post by Jaime Botello
Hey Paolo,
I was thinking about this after reading a little bit more on how data is
deserialized by pnda.io
For example, if I download the file from pnda hdfs and read it using avro
tools, you can see pnda(kafka) was not able to deserialize the data.
Since Pnda use byte array deserialization, and by reading their logstash
integration notes, they clearly mention they use byte array
serialization,
Post by Jaime Botello
don't you think we could fix this by just adding byte array serialization
into pmacct kafka plugin?
Let me know if this make sense.
thanks
tojson
Post by Jaime Botello
f0f01acf-5011-42ec-90b0-c18f21e4e2ab.avro
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
{"topic":"netflow","timestamp":1521676850049,"reason":{"string":"*Unable
to
Post by Jaime Botello
\"\", \"iface_in\": 654, \"iface_out\": 659, \"ip_src\": \"x.x.x.x\",
\"x.x.x.x\", \"mask_src\": 32, \"mask_dst\": 16, \"port_src\": 2099,
\"port_dst\": 55764, \"tcp_flags\": \"24\", \"ip_proto\": \"tcp\",
0, \"sampling_rate\": 1000, \"timestamp_start\": \"2018-03-16
22:17:29.856000\", \"timestamp_end\": \"2018-03-16 22:17:29.856000\",
\"timestamp_arrival\": \"2018-03-16 22:18:30.893573\",
\"2018-03-16 22:17:00\", \"stamp_updated\": \"2018-03-16 22:19:01\",
\"packets\": 1, \"bytes\": 667, \"writer_id\": \"default_kafka/2971\"}"}
I’ve also been keeping a keen eye on the Pnda.io guide page (still
blank)
Post by Jaime Botello
- Tim
thank you Paolo, I will update the group as well if we have an update.
Post by Paolo Lucente
Hi Jaime,
While i can't help you with this, i'm trying to see whether this can
be
Post by Jaime Botello
Post by Paolo Lucente
further investigated with the PNDA team (as a result of that i seem to
understand you were also in touch with them too). Keep you posted in
case of any news.
Paolo
Post by Jaime Botello
Hi,
We are working on getting pmacct as a netflow producer for pnda.io
data
Post by Jaime Botello
Post by Paolo Lucente
Post by Jaime Botello
pipeline and we are wondering if anyone else had tried to integrate
pmacct
Post by Jaime Botello
with pnda.io.
pnda.io use kafka and avro to get data into the pipeline.
After getting all components required on pmacct we believe there's
something still missing.
I'm using the following code just to test and consume the data that
is
Post by Jaime Botello
Post by Paolo Lucente
in
Post by Jaime Botello
the pipeline
https://github.com/pndaproject/example-kafka-clients/blob/
develop/python/consumer.py
Post by Jaime Botello
In theory, when running the above code, we should get an output
like the
Post by Jaime Botello
Post by Paolo Lucente
Post by Jaime Botello
following
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/data-preparation.md#consumer-example
Post by Jaime Botello
Instead, the script fails with unicode decoding errors
------------------------------------------------------------
ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
timestamp=1521442141690, timestamp_type=0, key=None,
value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.16
\xea\x8e\x06\x8e
Post by Jaime Botello
\x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
06:36:00\x02&2018-03-19
06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884',
checksum=None,
Post by Jaime Botello
Post by Paolo Lucente
Post by Jaime Botello
serialized_key_size=-1, serialized_value_size=246)
error
------------------------------------------------------------
File "consumer.py", line 115, in run
consume_message(message)
File "consumer.py", line 78, in consume_message
msg = reader.read(decoder)
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 445, in read
return self.read_data(self.writers_schema, self.readers_schema,
decoder)
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 490, in read_data
return self.read_record(writers_schema, readers_schema,
decoder)
Post by Jaime Botello
Post by Paolo Lucente
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 690, in read_record
field_val = self.read_data(field.type, readers_field.type,
decoder)
Post by Jaime Botello
Post by Paolo Lucente
Post by Jaime Botello
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 468, in read_data
return decoder.read_utf8()
File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
n2.7/site-packages/avro/io.py",
Post by Jaime Botello
line 233, in read_utf8
return unicode(self.read_bytes(), "utf-8")
UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in position
invalid start byte
Current pnda.io, use some avro plugin when integrating with
logstash,
Post by Jaime Botello
Post by Paolo Lucente
in
Post by Jaime Botello
our case we would like to use pmacct instead. We believe, in the
case
Post by Jaime Botello
Post by Paolo Lucente
of
Post by Jaime Botello
pmacct something else may required in order to push data correctly
to
Post by Jaime Botello
Post by Paolo Lucente
Post by Jaime Botello
pnda.io kafka.
https://github.com/pndaproject/pnda-guide/blob/develop/
producer/logstash.md
Post by Jaime Botello
On thing I notice that pnda.io and logstash define as part of the
avro/kafka plugin is
value_serializer => 'org.apache.kafka.common.seria
lization.ByteArraySerializer
Post by Jaime Botello
However, i was not able to find out if this is something I can set
up
Post by Jaime Botello
Post by Paolo Lucente
with
Post by Jaime Botello
pmacct/kafka integration.
I'm attaching some relevant configuration.
! ..
plugins: kafka
!
aggregate: src_host, dst_host, src_port, dst_port, proto, tos,
src_as,
Post by Jaime Botello
Post by Paolo Lucente
Post by Jaime Botello
dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net,
dst_net,
Post by Jaime Botello
Post by Paolo Lucente
Post by Jaime Botello
src_mask, dst_mask, tcpflag
s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
!
nfacctd_port: 2055
nfacctd_ip: 10.180.222.10
!
!
kafka_output: avro
avro_schema_output_file: ~/pmacct/pnda.avsc
kafka_topic: netflow
kafka_refresh_time: 60
kafka_history: 1m
kafka_history_roundoff: m
kafka_broker_host: 10.180.221.130
kafka_broker_port: 9092
{
"namespace": "pnda.entity",
"type": "record",
"name": "event",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "src", "type": "string"},
{"name": "host_ip", "type": "string"},
{"name": "rawdata", "type": "bytes"}
]
}
thanks for any light here
--
*Jaime Botello** (aka Jimbo) *// *Riot Games*
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
--
310-210-9772 // summoner: Riot R3lick
--
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing lis

Loading...