apache spark - Difference in fromOffset/untilOffset/offset.count to total records in the RDD partition -


my spark-kakfa streaming logic looks this...

when @ offsets , records, there different in offset.count() of partition , total number records printing when use foreach loop.(on rdd partition).

those counts inconsistent logic.

can guide me correct logic.???

javainputdstream<byte[]> directkafkastream = kafkautils.createdirectstream(jsc, string.class, byte[].class,         stringdecoder.class, defaultdecoder.class, byte[].class, kafkaparams, topicmap,         (function<messageandmetadata<string, byte[]>, byte[]>) messageandmetadata::message);  directkafkastream.foreachrdd(rdd -> {     rdd.foreachpartition(itr -> {         integer partnid = taskcontext.get().partitionid();          arraylist <byte[]> recordbatch = new arraylist <byte[]>();              while (itr.hasnext()) {                 byte[] record = itr.next();                 recordbatch.add(record);             }                  offsetrange[] offsets = ((hasoffsetranges) rdd.rdd()).offsetranges();             //partition 0 fromoffset 0 untiloffset 2 offset.count(): 2             //partition 1 fromoffset 0 untiloffset 3 offset.count(): 3                 (offsetrange offset : offsets) {                      if (offset.partition() == partnid) {                         recordbatch.foreach((rec) -> {                         //partition 0 fromoffset 0 untiloffset 2 offset.count(): 2 --> total records after looping: 3                         //partition 1 fromoffset 0 untiloffset 3 offset.count(): 3 --> total records after looping: 3                             //business logic goes here.                         }                     }                  }       }); }); 


Comments

Popular posts from this blog

java - SSE Emitter : Manage timeouts and complete() -

jquery - uncaught exception: DataTables Editor - remote hosting of code not allowed -

java - How to resolve error - package com.squareup.okhttp3 doesn't exist? -