Tag Archives: spliterator

Parallel Streams and Spliterators

Today we are going to look at one of the aspects where using streams is a real win – when we need to thread work. As well as parallel streams, we will also look at Spliterators which acts as the machinery which pushes elements into the pipeline.

Streams use a technique known as internal iteration. It’s internal because the Iterator (or in our case Spliterator) which supplies work through our stream is hidden from us. To use a stream all we need do [once we have a source] is add the stages of the pipeline and supply the functions that these stages require. We don’t need to know how the data is being passed along the pipeline, just that it is. The benefit is that the workings are hidden from us and we can focus more on the work that must be done rather than how it can be done.

The opposite, external iteration is where we are given a loop variable or iterator and we look up the value, and pass it through the code ourselves. This obviously gives us a benefit in that have full control and low overhead. The downside is we have to do all the work looking up the values and passing them through the loop body. This will also mean more test code, and testing loop bodies properly can be tricky. With normal for-loops we also have to be careful of one-off errors.

The question we need to ask ourselves when considering the iteration method: Do we really need absolute control for the task? Streams do some things really well but come with a small performance penalty. Perhaps a non-stream (or even non-Java) solution is more appropriate for high performance work. On the other hand, sorting and filtering files to display in say a ‘recently accessed’ menu item doesn’t require high performance. In that case we’d probably settle for an easy and quick way to do it rather than the best performing one. Even if we go with a performant solution some benchmarking will be necessary as surprises often await. Thus we’re trading convenience off against performance, development time and risk of bugs.

Streams are easy to parallelise as we’ll see. We just change the type of the stream to a parallel stream using the parallel() operator. To do this with internal iteration is hard because it’s set up that we get one item per iteration. The best we can do in that environment is pass work off to threads. To do things efficiently we’d probably have to ditch looping through all the values in the outer loop and look at dividing the work up another way. We’ll see a way of doing this.

With that in mind we’ll look at a prime number generator. First this is not the most efficient prime number generator. For a demonstration it was useful to have an application that was well known, easy to understand, easy to perform with streams and would take a fair bit of computation time to complete.

Let’s look at the internal iteration version first:

EDIT: It’s been pointed out that the simple test (i % j == 0) is better than (i / j * j == i)

public class ForLoopPrimes
{
  public static Set<Integer> findPrimes(int maxPrimeTry)
  {
    Set<Integer> s = new HashSet<>();

    // The candidates to try (1 is not a prime number by definition!)
    outer:
    for (int i = 2; i <= maxPrimeTry; i++)
    {
      // Only need to try up to sqrt(i) - see notes
      int maxJ = (int) Math.sqrt(i);

      // Our divisor candidates
      for (int j = 2; j <= maxJ; j++)
      {
        // If we can divide exactly by j, i is not prime
        if (i / j * j == i)
        {
          continue outer;
        }
      }

      // If we got here, it's prime
      s.add(i);
    }

    return s;
  }

  public static void main(String args[])
  {
     int maxPrimeTry = 9999999;

     long startTime = System.currentTimeMillis();

     Set<Integer> s = findPrimes(maxPrimeTry);

     long timeTaken = System.currentTimeMillis() - startTime;

     s.stream().sorted().forEach(System.out::println);

     System.out.println("Time taken: " + timeTaken);
  }
}

Note: Since we only need to find one divisor, and multiplication is commutative, we only need to exhaust all potential pairs of factors and test one of them [the smaller]. The smaller can’t be any bigger than the square root of the candidate prime and must be at least 2.

This is an example of a brute force algorithm. We’re trying every combination rather than using any stealth or optimisation. We’d also in this case expect the internal iteration version to run fast since there is not a lot of work per iteration.

So why do we have to demonstrate this?

Suppose we want to take advantage of hardware in modern processors and thread this up. How might we do it? Up to Java 7 and certainly before Java 5 this would have been a real pain. We’ve got to divide up the workload, maintain a pool of threads and signal them that there is work available and then collect the work back from them when done. We probably also want to shut the worker threads down at the end if we have any more work to do. While it’s not rocket science, it can be hard to get right quickly and subtle bugs can be hard to spot.

Java 7 makes this a lot easier with the ForkJoin framework. It’s still tricky and easy to get wrong. We’ll use a RecursiveAction to break up the outer loop into pieces of work using a divide-and-conqueror strategy. Note that parallel streams do this as well.

public class ForkJoinPrimes
{
  private static int workSize;
  private static Queue<Results> resultsQueue;

  // Use this to collect work
  private static class Results
  {
    public final int minPrimeTry;
    public final int maxPrimeTry;
    public final Set resultSet;

    public Results(int minPrimeTry, int maxPrimeTry, Set resultSet)
    {
      this.minPrimeTry = minPrimeTry;
      this.maxPrimeTry = maxPrimeTry;
      this.resultSet = resultSet;
    }
  }

  private static class FindPrimes extends RecursiveAction
  {
    private final int start;
    private final int end;

    public FindPrimes(int start, int end)
    {
      this.start = start;
      this.end = end;
    }

    private Set<Integer> findPrimes(int minPrimeTry,
                                    int maxPrimeTry)
    {
      Set<Integer> s = new HashSet<>();

      // The candidates to try
      // (1 is not a prime number by definition!)
      outer:
      for (int i = minPrimeTry; i <= maxPrimeTry; i++)
      {
        // Only need to try up to sqrt(i) - see notes
        int maxJ = (int) Math.sqrt(i);

        // Our divisor candidates
        for (int j = 2; j <= maxJ; j++)
        {
          // If we can divide exactly by j, i is not prime
          if (i / j * j == i)
          {
            continue outer;
          }
        }

        // If we got here, it's prime
        s.add(i);
      }

      return s;
    }

    protected void compute()
    {
      // Small enough for us?
      if (end - start < workSize)
      {
        resultsQueue.offer(new Results(start, end,
                                 findPrimes(start, end)));
      }
      else
      {
        // Divide into two pieces
        int mid = (start + end) / 2;

        invokeAll(new FindPrimes(start, mid),
                            new FindPrimes(mid + 1, end));
      }
    }
  }

  public static void main(String args[])
  {
    int maxPrimeTry = 9999999;
    int maxWorkDivisor = 8;

    workSize = (maxPrimeTry + 1) / maxWorkDivisor;

    ForkJoinPool pool = new ForkJoinPool();

    resultsQueue = new ConcurrentLinkedQueue<>();

    long startTime = System.currentTimeMillis();

    pool.invoke(new FindPrimes(2, maxPrimeTry));

    long timeTaken = System.currentTimeMillis() - startTime;

    System.out.println("Number of tasks executed: " +
                       resultsQueue.size());

    while (resultsQueue.size() > 0)
    {
      Results results = resultsQueue.poll();

      Set<Integer> s = results.resultSet;

      s.stream().sorted().forEach(System.out::println);
    }

    System.out.println("Time taken: " + timeTaken);
  }
}

This is quite recognisable since we have reused the sequential code to carry out the work in a subtask. We create two RecursiveActons to break the workload into two pieces. We keep breaking down until the workload is below a certain size when we carry out the action. We finally collect our results on a concurrent queue. Note there is a fair bit of code.

Let’s look at a sequential Java 8 streams solution:

public class SequentialStreamPrimes
{
  public static Set<Integer> findPrimes(int maxPrimeTry)
  {
    return IntStream.rangeClosed(2, maxPrimeTry)
                    .map(i -> IntStream.rangeClosed(2,
                                          (int) (Math.sqrt(i)))
                    .filter(j -> i / j * j == i).map(j -> 0)
                    .findAny().orElse(i))
                    .filter(i -> i != 0)
                    .mapToObj(i -> Integer.valueOf(i))
                    .collect(Collectors.toSet());
  }

  public static void main(String args[])
  {
    int maxPrimeTry = 9999999;

    long startTime = System.currentTimeMillis();

    Set<Integer> s = findPrimes(maxPrimeTry);

    long timeTaken = System.currentTimeMillis() - startTime;

    s.stream().sorted().forEach(System.out::println);

    System.out.println("Time taken: " + timeTaken);
  }
}

EDIT: A better and quicker version was posted on DZone by Tom De Greyt. Out of courtesy I’ve asked for permission to repost his solution rather than just add it here, but it would also serve as a good exercise for the reader to try to find it. Hint: it involves a noneMatch. If you want to see it, it’s in the comments on the link but it would be beneficial to try to spend a few minutes to find it first.

We can see the streams solution matches up with the external iteration version quite well except for a few tricks needed:

  • Since we only need one factor we use findAny(). This acts like the break statement.
  • findAny() returns an Optional so we need to unwrap it to get our value. If we have no value (i.e. we found a prime) we will store the prime (the outer value, i) by putting it in the orElse clause.
  • If the inner IntStream finds a factor, we can map to 0 which we can filter out before storing.

So let’s make it threaded. We only need to change the findPrimes method slightly:

  public static Set<Integer> findPrimes(int maxPrimeTry)
  {
    return IntStream.rangeClosed(2, maxPrimeTry)
                    .parallel()
                    .map(i -> IntStream.rangeClosed(2,
                                          (int) (Math.sqrt(i)))
                    .filter(j -> i / j * j == i).map(j -> 0)
                    .findAny().orElse(i))
                    .filter(i -> i != 0)
                    .mapToObj(i -> Integer.valueOf(i))
                    .collect(Collectors.toSet());
  }

This time we don’t have to mess around with the algorithm. Simply by adding an intermediate stage parallel() to the stream we make it divide up the work. Parallel(), like filter and map, is an intermediate operation. Intermediate operations can also change the behaviour of a stream as well as affect the passing values. Other intermediate stages we’re not seen yet are:

  • sequential() – make the stream sequential
  • distinct() – only distinct values pass
  • sorted() – a sorted stream is returned, optionally we can pass a Comparator
  • unordered() – return an unordered stream

If we fire up jconsole while we’re running and look at the Threads tab, we can compare the sequential and parallel version. In the parallel version we can see several ForkJoin threads doing the work.

I did some timings and got the following results [note this is not completely accurate since other tasks might have been running in the background on my machine – values are to the nearest half-second].

  • External, sequential (for-loop): 8.5 seconds
  • External, parallel (ForkJoin): 2.5 second
  • Internal, sequential (sequential stream): 21 seconds
  • Internal, parallel (parallel stream): 6 seconds

This is probably as expected. The amount of work per iteration in the inner loop is low, so any stream actions will have relatively high overhead as seen in the sequential stream version. The parallel stream comes in slightly faster than the for-loop, but the ForkJoin version outperforms it by a factor of more than 2. Note how simpler the streams version was [once we get the hang of streams of course] compared to the amount of code in the ForkJoin version.

Let’s have a look at the work-horse of this work distribution, the Spliterator. A Spliterator is an interface like an Iterator, but instead of just providing the next value, it can also divide work up into smaller pieces which are executed by ForkJoinTasks.

When we create a Spliterator we provide details of the size of the workload and characteristics that the values have. Some types of Spliterators such as RangeIntSpliterator [which IntRange supplies] use the characteristics() method to return characteristics, rather than having them supplied via a constructor like AbstractSpliterator does.

We obviously need the size of the workload so we can divide up the work up and know when to stop dividing. The characteristics we can supply are defined in the Spliterator interface as follows:

SIZED – we can supply a specific number of values that will be sent prior to processing (versus an InfiniteSupplyingSpliterator)
SUBSIZED – implies that any Spliterators that trySplit() creates will be SIZED and SUBSIZED. Not all SIZED Spliterators will split into SUBSIZED spliterators. The API gives an example of a binary tree where we might know how many elements are in the tree, but not in the sub-trees
ORDERED – we supply the values in sequence, for example from a list
SORTED – the order follows a sort order (rather than sequence); ORDERED must also be set
DISTINCT – each value is different from every other, for example if we supply from a set
NONNULL – values coming from the source will not be null
IMMUTABLE – it’s impossible to change the source (such as add or remove values) – if this is not set and neither is CONCURRENT we’re advised to check the documentation for what happens on modification (such as a ConcurrentModificationException)
CONCURRENT – the source may be concurrently modified safely and we’re advised to check the documentation on the policy

These characteristics are used by the splitting machinery, for example in the ForEachOps class (which is used to carry out tasks in a pipeline terminated with a forEach). Normally we can just use a pre-built Spliterator [and often don’t even need to worry about that because it’s supplied by the stream() method]. Remember the streams framework allows us to get work done without having to know all the details of how its being done. It’s only in the rare cases of a special problem or needing maximum performance do we have to worry.

Splitting is done by the trySplit() operation. This returns a new Spliterator. For the requirements of this function the API documentation should be referred to.

When we consume the contents of [part of] the stream in bulk using the Spliterator, the forEachRemaining(action) operation is called. This takes source data and calls the next action via the action’s accept call. For example if the next operation is filter, the accept call on filter is called. This calls the test method of the contained predicate, and if that is true, the accept method of the next stage is called. At some point a terminal stage will be called [the accept method calls no other stage] and the final value will be consumed, reduced or collected. When we call a stream() method, this pipeline is created and calling intermediate stages chains them to the end of the pipeline. Calling the final consuming stage makes the final link and sets everything off.

Alternatively when we need to generate each element from a non-bulk source, the tryAdvance() function is used. This is passed an action which accept is called on as before. However, we return true if we want to continue and false if we don’t. InfiniteSupplyingSpliterator for example always returns true, but we can use an AbstractSpliterator if we want to control this. Remember the AbstractIntSpliterator from our SixGame in the finite generators article? One of our tryAdvance functions was this:

@Override
public boolean tryAdvance(Consumer action)
{
  if (action == null)
    throw new NullPointerException();
  if (done)
    return false;

  action.accept(rollDie());

  return true;
}

In this case if we roll the die we always continue. This would allow the done logic to be set from elsewhere if we didn’t want to roll a die again. It might have been slightly better to have returned !done instead of true to terminate generation immediately as soon as the six was thrown. However in this case going through another cycle was hardly a chore.

That’s it for the streams overview. In the next article we’ll look a bit more at lambda expressions.

Advertisements

Finite Sequence Generators in Java 8 – Part 2

In the last couple of articles we looked at generators. First we looked at ways of generating an infinite sequence. In the second we saw a way of generating a finite sequence. Let’s look at a few more aspects before we move on.

In the finite sequence article, we saw that unless we wanted to limit ourselves to a certain number of values we couldn’t use generate and iterate in a simple manner. This was because there was no way of indicating a stop condition. Limit is fine if we know how many values we need, but not if we don’t. If we use limit we’d have to create a new stream to get further values. There are a couple of other methods we could use for generating finite sequences without having to resort to using Iterable.

Let’s go back to our die throwing SixGame example from the last article. Instead of using an Iterator/Iterable, we’ll use an IntSupplier coupled with IntStream’s generate method. If any of that is new to you, then first review the article on generators with infinite sequences. We’re going to attempt (and I’m not saying this is good practice) to stop generating when we get a Six by throwing an exception:

public class SixGame
{
	public static class DieThrowSupplier implements IntSupplier
	{
		private Random rand = new Random(System.nanoTime());
		private boolean done = false;

		@Override
		public int getAsInt()
		{
			if (!done)
			{
				int dieThrow = Math.abs(rand.nextInt()) % 6 + 1;

				if (dieThrow == 6)
				{
					done = true;
				}

				return dieThrow;
			}
			else
			{
				throw new NoSuchElementException();
			}
		}
	}

	public static void main(String args[])
	{
		DieThrowSupplier dieThrows = new DieThrowSupplier();

		IntStream myStream = IntStream.generate(dieThrows);
		
		try
		{
			myStream.mapToObj(i -> "You threw a " + i).forEach(
					System.out::println);
		}
		catch (NoSuchElementException e)
		{
			// Escaped
		}
	}
}

Something here that’s new is the mapToObj call. We’re starting out with an IntStream, but we want to create a message which is a String. Thus we need to change the ‘shape’ of the stream from Integer to Object (there is no special String stream) and we can do that with mapToObj. It works like map, but instead of expecting an Integer being returned from the function, it expects an Object.

We have to catch the exception, but luckily (or perhaps sloppily given this is a demonstration) we are using a side-effect to do something with the string we generate: printing in forEach. Once we go parallel though we need to remove side effects. Although we’ve not covered it yet, what we need to do is collect the results from the stream, perhaps in a list, and then perform the printing outside of the stream chain. Although this seems a lot for our simple game, getting streams to work properly in parallel is one of the more difficult tasks that we’re going to have to master eventually.

Our problem in the parallel world is going to be that we’re collecting, but we need to assign that collection to something when the stream is done. Try changing the try/catch code to the broken:

            List<String> l = null;

            try
            {
                    l = myStream.parallel().mapToObj(i -> "You threw a " + i)
                                           .collect(Collectors.toList());
            }
            catch (NoSuchElementException e)
            {
                    // Escaped
            }

            l.stream().forEach(System.out::println);

Nothing gets printed this time, and we crash with a NullPointerException. Given we throw an exception during the stream which we catch after the assignment and not as part of the stream, the assignment never happens. Thus the list, l, stays null. We went through all the motions and got nothing for our troubles. Perhaps we could try making special collectors to handle exceptions, but given an exception is almost certainly a side-effect we should avoid these when going parallel. I can also imagine that catching exception outside of a stream and hoping we still get all the results might be quite flaky as we’re relying on the implementation to make it work. Implementations change, and other implementations come along. My verdict is – unless Oracle say otherwise, is avoid.

We also discussed that we wanted to avoid implementing a whole spliterator if there was another way available. To recap, a spliterator is an iterator that can be split into batches of work and is what drives streams. Getting that right isn’t trivial. We saw that we couldn’t get access to override InfiniteSupplyingSpliterator in order to make a version we could terminate. However, there exists a spliterator that is just missing tryAdvance which we use to inject the next value into the stream and indicate when we’re done. This is AbstractSpliterator, in particular AbstractIntSpliterator, which we can extend. Let’s have a look at our game using one of those:

public class SixGame
{
	public static class DieThrowSpliterator extends
			Spliterators.AbstractIntSpliterator
	{
		private Random rand = new Random(System.nanoTime());
		private boolean done = false;

		protected DieThrowSpliterator()
		{
			super(Long.MAX_VALUE, 0);
		}

		private int rollDie()
		{
			int dieThrow = Math.abs(rand.nextInt()) % 6 + 1;

			if (dieThrow == 6)
			{
				done = true;
			}

			return dieThrow;
		}

		@Override
		public boolean tryAdvance(IntConsumer action)
		{
			if (action == null)
			{
				throw new NullPointerException();
			}
			
			if (done)
			{
				return false;
			}

			action.accept(rollDie());

			return true;
		}

		@Override
		public boolean tryAdvance(Consumer<? super Integer> action)
		{
			if (action == null)
 			{
				throw new NullPointerException();
  			}

  			if (done)
  			{
				return false;
  			}

  			action.accept(rollDie());

  			return true;
		}
	}

	public static void main(String args[])
	{
		Stream<Integer> stream = StreamSupport.stream
					(new DieThrowSpliterator(), false);

		stream.map(i -> "You threw a " + i)
	              .forEach(System.out::println);
	}
}

First notice that we are creating the stream the same way we did when using an Iterable, but instead we are creating a spliterator which we pass to the stream. The second thing to notice is that we have to implement two tryAdvance functions. These take Consumers which will use our value. The first is a true IntConsumer, where as the second is a Consumer of any type which can hold an Integer (Object, Number and Integer). I’ve kept the null check used in other spliterators. If we’re already done, we can return false, otherwise pass a roll to the action and return true. The parent constructor of our spliterator takes two values, the first being how many values we expect (we don’t know) and flags for characteristics of the spliterator (0 being none of them).

No doubt we could continue the discussion on generation, particularly as we now have several ways to solve problems. For now we’ll move on and look at a few more aspects of Java 8 functional programming and lambda expressions.

Finite sequence generators in Java 8

… and introducing default methods.

Last time we looked at generators, and more specifically those generating an infinite sequence. We saw that there were several ways to achieve this:

  • The older Java 7 way with an iterator like class
  • Using Stream’s iterate method
  • Using Stream’s generate method

We also saw that when using a Stream we had to use the limit method on our infinite sequence otherwise it would keep generating and the program couldn’t continue. The problem with using limit was that once we’d got the values, we couldn’t use the stream again to get more. To solve this problem, we used an IntSupplier and created several streams with it to process batches of values.

What if the sequence we wanted to process was finite. We want to avoid buffering up values in advance. We also want to consume the sequence without knowing up front how many values it will return because of reasons such as:

  • We can’t work it out or it’s difficult to
  • There is a random element
  • We want to decouple the generating function from stream doing the consuming

We saw a simple finite sequence with our Hello World! example in the first article. In this case we were not generating the sequence with a function, we were instead processing a pre-initialised list. We also saw we could use an iterator when we discussed infinite sequences, but went on to discuss other ways. We’ll see that in this case, we are virtually forced into the Iterator solution.

Let’s take a simple sequence which we can’t know the length of. We’ll simulate a game where the idea is to keeping throwing a die until we get a six. First we’ll start with a non-functional implementation:

public class SixGame
{
	public static class DieThrowIterator implements Iterator<Integer>
	{
		private int dieThrow = 0;
		private Random rand = new Random(System.nanoTime());

		@Override
		public boolean hasNext()
		{
			return dieThrow != 6;
		}

		@Override
		public Integer next()
		{
			dieThrow = Math.abs(rand.nextInt()) % 6 + 1;
			return dieThrow;
		}
	}

	public static void main(String args[])
	{
		DieThrowIterator dieThrowIterator = new DieThrowIterator();

		while (dieThrowIterator.hasNext())
		{
			System.out.println("You threw a " + dieThrowIterator
                                  .next());
		}
	}
}

(Note our random number generation is not very robust, but for a simple demonstration it will do).

Let’s try to use a Stream. We can’t use Stream’s iterate function because it doesn’t allow a stop condition. Also an IntSupplier with generate is not an option unless we want to use limit(1) and create a stream for each die throw, when we might as well not use a stream at all.

If we look under the hood at the implementation of IntStream’s generate function we see that it creates an InfiniteSupplyingSpliterator. This has a problem for us – the tryAdvance function always returns true meaning we can never stop.

We could implement our own Spliterator where our tryAdvance checks for a stop condition. We can’t simply extend InfiniteSupplyingSpliterator and override the tryAdvance method since it’s an inner class of a default access class. So the only way is a cut and paste job rather than inheritance. I’m very nervous about copying large parts of code which might change in future versions; this is saying to me it’s not what was intended. We should look for other ways first.

Let’s look to see how List does its streaming. List’s streaming comes from inheriting Iterable. To create an Iterable we only need to implement its iterator() function – for the example let’s do so as an inner-class:

	public static class DieThrowIterable implements Iterable<Integer>
	{
		@Override
		public Iterator<Integer> iterator()
		{
			return new DieThrowIterator();
		}
	}

We can then stream:

	public static void main(String args[])
	{
		Stream<Integer> stream = StreamSupport.stream(
				new DieThrowIterable().spliterator(), false);

		stream.map(i -> "You threw a " + i).forEach(System.out::println);
	}

Wait a moment… Iterable is an interface, yet it has a spliterator() method implemented. How can that be?

If we look at the interface, there is indeed a spliterator() method creating a new spliterator for us. If we also look closely, we see the default keyword. This was added in Java 8 (cunningly default is already a reserved word for switch statements so it won’t break old code). When we write interfaces we can provide default methods already implemented. Now some people might have reservations about this as it’s turning an interface in to effectively an abstract class. There are some good reasons and advantages this gives us:

  • It avoids having to create abstract classes to implement interfaces to supply default methods. Thus there is less boiler-plate to write, and also prevents the consumer of the API implementing the interface when we expected the abstract class to be used instead.

  • It helps with multiple inheritance issues (inherting from two or more classes which C++ supports, but not Java). In Java, the problems with multiple inheritance were avoided by allowing only single inheritance from classes, but we can implement as many interfaces as desired. Problem is we had to implement large portions of those interfaces in each class that used them – a real pain.

  • Our API vendors can also now add new methods to interfaces without breaking old code. This is one reason we see a default method here. A new interface would make the JDK even larger and make things more complicated.

  • Lambda expressions bind to interfaces with one single method left to implement. If there were more we couldn’t do this binding. We’ll look at this in a future article.

Note: If we implement more than one interface with an identical default method, and we do not override it by implementing in the class or a parent class, this is an error. Whether eventually it’ll be possible to use Scala-like mix-in traits is an interesting question.

So using Iterable is one way to do finite sequence generators, although the documentation accompanying the default spliterator() method suggests it should be overridden for better performance. In a later article we’ll come back and look at spliterators in some more detail.

In the next article we’ll look another couple of ways to implement finite sequence generators, one good, and one we should avoid.