Could you explain differences between task slot and parallelism in Apache Flink v1.9?
Here is the my understanding so far
- Flink says that TaskManager is the worker PROCESS. And normally you should have one TaskManager per one computer.
- Let's say I have 3 computers and both of them have 16 CPU cores. Each computer will be TaskManager. Therefore I will have 3 TaskManagers
- I have thought that if one computer has 16 cpu cores, then TaskManager can create max 16 Task slots. Therefore there is a CPU isolation in there. However Flink says that link => "Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks."
- That's means 16 slots = 16 threads ? And also
numberOfSlot can be >= numberOfCpuCores
?
If task slots mean thread, this may lead "shared to access data problem, race condition" etc..? This is the my first question.
- Second question is the one I wrote to beginning of the my post => differences between task slot and parallellism. I am talking about to env.setparalellism(number).
- Let's say my parallelism number = 2
- Then for each task slot (thread or whatever it is) will be executed with 2 threads?
- if it is, this may lead "shared to access data problem, race condition" etc..?
- if it is not, what does the parallelism means?
- Here is the example. In this example, should I care about writing
apply()
method because of threads environment?:
public class AverageSensorReadings {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
int paralellism = env.getParallelism();
int maxParal = env.getMaxParallelism();
// ingest sensor stream
DataStream < SensorReading > sensorData = env
// SensorSource generates random temperature readings
.addSource(new SensorSource())
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner());
DataStream < SensorReading > avgTemp = sensorData
// convert Fahrenheit to Celsius using and inlined map function
.map(r -> new SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)))
// organize stream by sensor
.keyBy(r -> r.id)
// group readings in 1 second windows
.timeWindow(Time.seconds(4))
// compute average temperature using a user-defined function
.apply(new TemperatureAverager());
// print result stream to standard out
//avgTemp.print();
System.out.println("paral: " + paralellism + " max paral: " + maxParal);
// execute application
env.execute("Compute average sensor temperature");
}
public static class TemperatureAverager extends RichWindowFunction < SensorReading, SensorReading, String, TimeWindow > {
/**
* apply() is invoked once for each window.
*
* @param sensorId the key (sensorId) of the window
* @param window meta data for the window
* @param input an iterable over the collected sensor readings that were assigned to the window
* @param out a collector to emit results from the function
*/
@Override
public void apply(String sensorId, TimeWindow window, Iterable < SensorReading > input, Collector < SensorReading > out) {
System.out.println("APPLY FUNCTION START POINT");
System.out.println("sensorId: " + sensorId + "\n");
// compute the average temperature
int cnt = 0;
double sum = 0.0;
for (SensorReading r: input) {
System.out.println("collected item: " + r);
cnt++;
sum += r.temperature;
}
double avgTemp = sum / cnt;
System.out.println("APPLY FUNCTION END POINT");
System.out.println("----------------------------\n\n");
// emit a SensorReading with the average temperature
out.collect(new SensorReading(sensorId, window.getEnd(), avgTemp));
}
}
}