I want in Flink to migrate huge table from cassandra so I do:
@Override
public Source<HistoryReadout, CassandraSplit, CassandraEnumeratorState> getCassandraSource(ClusterBuilder clusterBuilder, String keyspace) {
String query = String.format("SELECT * FROM %s.tablename;", keyspace);
long maxSplitMemorySize = MemorySize.ofMebiBytes(10).getBytes();
return new OPTCassandraSource<>(clusterBuilder,
maxSplitMemorySize,
HistoryReadout.class,
query,
() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)});
}
and standard Source Transform Sink
Source<HistoryReadout, CassandraSplit, CassandraEnumeratorState> cassandraSource = getCassandraSource(clusterBuilder, keyspace);
DataStream<Tuple2<String, HistoryReadout>> processedStream = cassandraStream.process(
new ProcessFunction<>() {
@Override
public void processElement(HistoryReadout readout, Context context, Collector<Tuple2<String, HistoryReadout>> out) {
System.out.println("In process element"+readout);
try {
String tableName = HistoryReadoutCassandraServiceUtil.getRowTableName(keyspace, readout);
System.out.println(tableName);
out.collect(new Tuple2<>(tableName, readout));
} catch (Exception e) {
System.err.printf("Error generating table name for record: %s, Error: %s%n", readout, e.getMessage());
}
}
}
);
but from what I debug is that connector fetches data from new queries using token( and never lands in processing state. I need to process this huge table in some batches. What I am doing wrong here ? thanks