3

I have a large Java 8 Stream (Stream<MyObject>) with objects that looks like this:

class MyObject {
   private String string;
   private Date timestamp;

   // Getters and setter removed from brevity 
}

I know that all timestamps for day 1 will arrive before those in day 2 but within each day the timestamps could be out of order. I'd like to sort the MyObject's in timestamp order on a per daily basis using the Stream API. Since the Stream is large I have to do this as lazily as possible, i.e. it would be OK to hold one days worth of MyObject's in memory but it would not be OK to hold much more than that.

How can I achieve this?

Update 2017-04-29:

A requirement is that I want to continue working on the same stream after the sorting! I'd like something like this (pseudo code):

Stream<MyObject> sortedStream = myStreamUnsorted().sort(onADailyBasis());
9
  • is it the question more about scheduling (when to do) or processing (how to do)? are you using the Spring stack? Commented Apr 23, 2017 at 12:48
  • I think most likely you'll have to work with the Stream's iterator in order to group the elements day by day before sorting. I don't think the Stream API can help you much with this kind of requirement. Commented Apr 23, 2017 at 12:53
  • @AndrewTobilko It's about processing and I'm not using Spring. Commented Apr 23, 2017 at 12:56
  • What is the smallest TimeUnit between your timestamps? Are we talking about seconds or even less? Commented Apr 23, 2017 at 13:00
  • 1
    I'm afraid that java streams are not well suited for sorting - especially if there is a lot of elements. Commented Apr 23, 2017 at 13:03

3 Answers 3

2

I'd suggest the following solution:

Store each value of your stream in a TreeMap to get it immediately sorted. As key use the object's timestamp.

 Map<Date, MyObject> objectsOfTheDaySorted = new TreeMap<>();

We need to know which object has to be removed from the map at the end. It'll be only one object but the member to store it in has to be (effectively) final. So I chose a plain list.

 List<MyObject> lastObject = new ArrayList<>();

Set the current day as integer.

 // just an example
 int currentDay = 23;

Use a predicate that determins whether the currentDay and the day of any passing by object don't match.

 Predicate<MyObject> predicate = myObject -> myObject.getTimestamp()
                    .toInstant()
                    .atZone(ZoneId.systemDefault())
                    .toLocalDate()
                    .getDayOfMonth() != currentDay;

Now stream your stream. Use peek() twice. First to put the object into the map. Second to overwrite the object in the list. Use anyMatch() as terminal operation and hand in the formerly created predicate. As soon as the first object appears that matches the criteria beeing from the next day, anyMatch() terminates the stream and returns true.

 stream.peek(myObject -> objectsOfTheDaySorted.put(myObject.getTimestamp(), myObject))
       .peek(myObject -> lastObject.set(0, myObject))
       .anyMatch(predicate);

Now you only have to remove the last passing by object which belongs already to the next day and therefore not to your map.

 objectsOfTheDaySorted.remove(lastObject.get(0).getTimestamp());

Done. You have a sorted Map of Objects that all belong to just one day. Hope this matches your expectations. Please find below the entire code in one block to get it better copied at once.

 Map<Date, MyObject> objectsOfTheDaySorted = new TreeMap<>();
 List<MyObject> lastObject = new ArrayList<>();

 // just an example
 int currentDay = 23;

 Predicate<MyObject> predicate = myObject -> myObject.getTimestamp()
                    .toInstant()
                    .atZone(ZoneId.systemDefault())
                    .toLocalDate()
                    .getDayOfMonth() != currentDay;

 stream.peek(myObject -> objectsOfTheDaySorted.put(myObject.getTimestamp(), myObject))
       .peek(myObject -> lastObject.set(0, myObject))
       .anyMatch(predicate);

 objectsOfTheDaySorted.remove(lastObject.get(0).getTimestamp());
Sign up to request clarification or add additional context in comments.

6 Comments

You might want to use set(0, myObject) instead of add. Otherwise, the lastObject list gets quite large. Alternatively, make lastObject an array of length 1 (lastObject[0] = myObject).
@Malte Hartwig: Thanks for this hint. I've totally overlooked this although I actually intended to use the set()-method. I've re-edited my post.
This will not work if the streams starts with objects from before the specified day, as the predicate will match the first one and terminate the stream already. Furthermore, you use dayOfMonth, which causes errors if the stream contains objects from more than 30 days.
Well this is a fact I knew before. I only focussed on the description above which says that the whole stream starts with one day, delivers all the objects of this day and somewhen reaches the next day. There is truely space for optimization.
@MalteHartwig That's because “This method exists mainly to support debugging, where you want to see the elements as they flow past a certain point in a pipeline”. See also is peek really only for debugging?
|
2

It depends whether you need to process the objects of all days or one specific day.

Building on DiabolicWords's answer, this is an example to process all days:

TreeSet<MyObject> currentDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));
LocalDate[] currentDay = new LocalDate[1];
incoming.peek(o -> {
    LocalDate date = o.getTimestamp().toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
    if (!date.equals(currentDay[0]))
    {
        if (currentDay != null)
        {
            processOneDaysObjects(currentDaysObjects);
            currentDaysObjects.clear();
        }
        currentDay[0] = date;
    }
}).forEach(currentDaysObjects::add);

This will collect the objects for one day, process them, reset the collection and continue with the next day.

If you only want one specific day:

TreeSet<MyObject> currentDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));
LocalDate specificDay = LocalDate.now();
incoming.filter(o -> !o.getTimestamp()
                       .toInstant()
                       .atZone(ZoneId.systemDefault())
                       .toLocalDate()
                       .isBefore(specificDay))
        .peek(o -> currentDaysObjects.add(o))
        .anyMatch(o -> {
            if (o.getTimestamp().toInstant().atZone(ZoneId.systemDefault()).toLocalDate().isAfter(specificDay))
            {
                currentDaysObjects.remove(o);
                return true;
            }
            return false;
        });

The filter will skip objects from before the specificDay, and the anyMatch will terminate the stream after the specificDay.

I have read that there will be methods like skipWhile or takeWhile on streams with Java 9. These would make this a lot easier.

Edit after Op specified goal more in detail

Wow, this is a nice exercise, and quite a tough nut to crack. The problem is that an obvious solution (collecting the stream) always goes through the whole stream. You cannot take the next x elements, order them, stream them, then repeat without doing it for the whole stream (i.e. all days) at once. For the same reason, calling sorted() on a stream will go through it completely (especially as the stream does not know the fact that the elements are sorted by days already). For reference, read this comment here: https://stackoverflow.com/a/27595803/7653073.

As they recommend, here is an Iterator implementation wrapped in a stream that kind of looks ahead in the original stream, takes the elements of one day, sorts them, and gives you the whole thing in a nice new stream (without keeping all days in memory!). The implementation is more complicated as we do not have a fixed chunk size, but always have to find the first element of the next next day to know when to stop.

public class DayByDayIterator implements Iterator<MyObject>
{
    private Iterator<MyObject> incoming;
    private MyObject next;

    private Iterator<MyObject> currentDay;

    private MyObject firstOfNextDay;
    private Set<MyObject> nextDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));

    public static Stream<MyObject> streamOf(Stream<MyObject> incoming)
    {
        Iterable<MyObject> iterable = () -> new DayByDayIterator(incoming);
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    private DayByDayIterator(Stream<MyObject> stream)
    {
        this.incoming = stream.iterator();
        firstOfNextDay = incoming.next();
        nextDaysObjects.add(firstOfNextDay);
        next();
    }

    @Override
    public boolean hasNext()
    {
        return next != null;
    }

    @Override
    public MyObject next()
    {
        if (currentDay == null || !currentDay.hasNext() && incoming.hasNext())
        {
            nextDay();
        }

        MyObject result = next;

        if (currentDay != null && currentDay.hasNext())
        {
            this.next = currentDay.next();
        }
        else
        {
            this.next = null;
        }

        return result;
    }

    private void nextDay()
    {
        while (incoming.hasNext()
                && firstOfNextDay.getTimestamp().toLocalDate()
                .isEqual((firstOfNextDay = incoming.next()).getTimestamp().toLocalDate()))
        {
            nextDaysObjects.add(firstOfNextDay);
        }
        currentDay = nextDaysObjects.iterator();

        if (incoming.hasNext())
        {
            nextDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));
            nextDaysObjects.add(firstOfNextDay);
        }
    }
}

Use it like this:

public static void main(String[] args)
{
    Stream<MyObject> stream = Stream.of(
            new MyObject(LocalDateTime.now().plusHours(1)),
            new MyObject(LocalDateTime.now()),
            new MyObject(LocalDateTime.now().plusDays(1).plusHours(2)),
            new MyObject(LocalDateTime.now().plusDays(1)),
            new MyObject(LocalDateTime.now().plusDays(1).plusHours(1)),
            new MyObject(LocalDateTime.now().plusDays(2)),
            new MyObject(LocalDateTime.now().plusDays(2).plusHours(1)));

    DayByDayIterator.streamOf(stream).forEach(System.out::println);
}

------------------- Output -----------------

2017-04-30T17:39:46.353
2017-04-30T18:39:46.333
2017-05-01T17:39:46.353
2017-05-01T18:39:46.353
2017-05-01T19:39:46.353
2017-05-02T17:39:46.353
2017-05-02T18:39:46.353

Explanation: currentDay and next are the basis for the iterator, while firstOfNextDay and nextDaysObjects already look at the first element of the next day. When currentDay is exhausted, nextDay() is called and continues adding incoming's element to nextDaysObjects until the next next day is reached, then turns nextDaysObjects into currentDay.

One thing: If the incoming stream is null or empty, it will fail. You can test for null, but the empty case requires to catch an Exception in the factory method. I did not want to add this for readability.

I hope this is what you need, let me know how it goes.

2 Comments

My problem is that I'd like to continue working with same stream afterwards which I'm not quite sure how to achieve with your suggested solution. I've updated the question to mention this in a clearer way.
@Johan I have added an alternative solution. It basically uses Iterator instead of Stream, but I added a factory util method that wraaps that iterator into a stream again. Use it as follows: Stream<MyObject> sortedStream = DayByDayIterator.streamOf(unsortedStream)
1

If you consider an iterative approach, I think it becomes much simpler:

TreeSet<MyObject> currentDayObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));
LocalDate currentDay = null;
for (MyObject m: stream::iterator) {
    LocalDate objectDay = m.getTimestamp().toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
    if (currentDay == null) {
        currentDay = objectDay;
    } else if (!currentDay.equals(objectDay)) {
        // process a whole day of objects at once
        process(currentDayObjects);
        currentDay = objectDay;
        currentDayObjects.clear();
    }
    currentDayObjects.add(m);
}
// process the data of the last day
process(currentDayObjects);

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.