hadoop - Flink thowing serialization error when reading from hbase -
when read hbase using richfatmapfunction inside map getting serialization error. trying if datastream equals particular string read hbase else ignore. below sample program , error getting.
package com.abb.flinktest import java.text.simpledateformat import java.util.properties import scala.collection.concurrent.triemap import org.apache.flink.addons.hbase.tableinputformat import org.apache.flink.api.common.functions.richflatmapfunction import org.apache.flink.api.common.io.outputformat import org.apache.flink.api.java.tuple.tuple2 import org.apache.flink.streaming.api.scala.datastream import org.apache.flink.streaming.api.scala.streamexecutionenvironment import org.apache.flink.streaming.api.scala.createtypeinformation import org.apache.flink.streaming.connectors.kafka.flinkkafkaconsumer08 import org.apache.flink.streaming.util.serialization.simplestringschema import org.apache.flink.util.collector import org.apache.hadoop.hbase.hbaseconfiguration import org.apache.hadoop.hbase.tablename import org.apache.hadoop.hbase.client.connectionfactory import org.apache.hadoop.hbase.client.htable import org.apache.hadoop.hbase.client.put import org.apache.hadoop.hbase.client.result import org.apache.hadoop.hbase.client.scan import org.apache.hadoop.hbase.filter.binarycomparator import org.apache.hadoop.hbase.filter.comparefilter import org.apache.hadoop.hbase.filter.singlecolumnvaluefilter import org.apache.hadoop.hbase.util.bytes import org.apache.log4j.level import org.apache.flink.api.common.functions.richmapfunction object flinktesthbaseread { def main(args:array[string]) { val env = streamexecutionenvironment.createlocalenvironment() val kafkastream = env.fromelements("hello") val c=kafkastream.map(x => if(x.equals("hello"))kafkastream.flatmap(new readhbase()) ) env.execute() } class readhbase extends richflatmapfunction[string,tuple11[string,string,string,string,string,string,string,string,string,string,string]] serializable { var conf: org.apache.hadoop.conf.configuration = null; var table: org.apache.hadoop.hbase.client.htable = null; var hbaseconnection:org.apache.hadoop.hbase.client.connection =null var tasknumber: string = null; var rownumber = 0; val serialversionuid = 1l; override def open(parameters: org.apache.flink.configuration.configuration) { println("getting table") conf = hbaseconfiguration.create() val in = getclass().getresourceasstream("/hbase-site.xml") conf.addresource(in) hbaseconnection = connectionfactory.createconnection(conf) table = new htable(conf, "testtable"); // this.tasknumber = string.valueof(tasknumber); } override def flatmap(msg:string,out:collector[tuple11[string,string,string,string,string,string,string,string,string,string,string]]) { //flatmap operation here } override def close() { table.flushcommits(); table.close(); } } }
error:
log4j:warn no appenders found logger (org.apache.flink.api.scala.closurecleaner$). log4j:warn please initialize log4j system properly. log4j:warn see http://logging.apache.org/log4j/1.2/faq.html#noconfig more info. exception in thread "main" org.apache.flink.api.common.invalidprogramexception: task not serializable @ org.apache.flink.api.scala.closurecleaner$.ensureserializable(closurecleaner.scala:172) @ org.apache.flink.api.scala.closurecleaner$.clean(closurecleaner.scala:164) @ org.apache.flink.streaming.api.scala.streamexecutionenvironment.scalaclean(streamexecutionenvironment.scala:617) @ org.apache.flink.streaming.api.scala.datastream.clean(datastream.scala:959) @ org.apache.flink.streaming.api.scala.datastream.map(datastream.scala:484) @ com.abb.flinktest.flinktesthbaseread$.main(flinktesthbaseread.scala:45) @ com.abb.flinktest.flinktesthbaseread.main(flinktesthbaseread.scala) caused by: java.io.notserializableexception: org.apache.flink.streaming.api.scala.datastream - field (class "com.abb.flinktest.flinktesthbaseread$$anonfun$1", name: "kafkastream$1", type: "class org.apache.flink.streaming.api.scala.datastream") - root object (class "com.abb.flinktest.flinktesthbaseread$$anonfun$1", <function1>) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1182) @ java.io.objectoutputstream.defaultwritefields(objectoutputstream.java:1548) @ java.io.objectoutputstream.writeserialdata(objectoutputstream.java:1509) @ java.io.objectoutputstream.writeordinaryobject(objectoutputstream.java:1432) @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1178) @ java.io.objectoutputstream.writeobject(objectoutputstream.java:348) @ org.apache.flink.util.instantiationutil.serializeobject(instantiationutil.java:301) @ org.apache.flink.api.scala.closurecleaner$.ensureserializable(closurecleaner.scala:170) ... 6 more
i tried wrapping field inside method , class making class serializable wel, no luck. throw lights on or suggest workaround this.
the problem you're trying access kafka stream variable in map function not serializable. abstract representation of data. doesn't contain anything, invalidates function in first place.
instead, this:
kafkastream.filter(x => x.equals("hello")).flatmap(new readhbase())
the filter funtion retain elements condition true, , passed flatmap function.
i highly recommend read basis api concepts documentation, there appears misunderstanding happens when specify transformation.
Comments
Post a Comment