0

I have a process that needs to intermittently move some files from one directory to another. Before I move the files, I need to ensure that a user isn't currently adding files to the directory - if they are, I want to wait until no new files have been added to the directory for N seconds before continuing the process (the value of N doesn't really matter here).

My original idea was to use a FileSystemWatcher to capture file creation events, use the .Throttle() method from Rx to get the last event generated within the specified period of time, and use result of that to disable an infinite delay loop. The problem is, this method will just wait forever if a user never makes any changes to the directory. Somehow, I need this to wait at least N seconds, and if there is no change within that period of time, continue. If there is a change within that period of time, then wait until N seconds have passed since the last change.

Here's what I have so far:

Console.WriteLine("start");

// wait to ensure no changes are being made to the directory
await Task.Run(async () =>
{
    var watcher = new FileSystemWatcher("D:\\Users\\mwcjk1\\source\\repos\\RandomTestAppNet8\\RandomTestAppNet8\\");
    watcher.EnableRaisingEvents = true;

    watcher.Created += (o, a) => { Console.WriteLine("file changed!"); };

    var subject = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
        ev => watcher.Created += ev,
        ev => watcher.Created -= ev
        );

    bool done = false;

    // wait for 7 seconds after the last file creation
    var disposable = subject.Throttle(TimeSpan.FromSeconds(7)).Subscribe(events => { done = true; });

    Console.WriteLine("Set up observer");

    while (!done)
    {
        await Task.Delay(1000);
        //Console.WriteLine("Waited 1 sec");
    }

    disposable.Dispose();
});

// Continue with the rest of the method execution
Console.WriteLine("done!");

I also don't like the Task.Delay() loop - it feels like there should be a much more elegant way of doing this (.Throttle() seems so close to working on its own!), but I haven't been able to figure it out.

Any advice is much appreciated, thanks!

7
  • Who tells you that not just in the moment that 1 second has passed another user starts working in that directory? How do you want to distinguish changes that you made from changes someone else made when they started working in the directory while your task is running? Commented Apr 3 at 21:04
  • @Psi I agree, there's no way to guarantee that someone won't start making changes as soon as the rest of the method begins executing. I'm planning on implementing other error handling later on, but in the majority of cases users will add a batch of files all at once, then not add anything else for an extended period of time. Commented Apr 3 at 21:06
  • I smell Murphy lurking behind the next corner. However, you might want to use a simple Timer that checks the modification date periodically and tests whether the latest modification is more than 1 second in the past. If you want to avoid lazy polling, you'll have to create a Windows-Hook (read the Windows-API) which most certainly involves writing C++ code that then sends messages to your application whenever a change was made (quite overkill if you ask me). Commented Apr 3 at 21:11
  • @Psi I appreciate your concern, but I'm mainly doing this to avoid inconveniencing a user - if that situation does arise, the worst case is they have to do some work twice. Unfortunately, there's no way for me to know in advance whether a user will be modifying the file system or not. My goal is to lock all the files ASAP, but even that introduces some race conditions. I'm not sure I understand your suggestion about the Timer - are you saying to have the FileSystemWatcher reset the timer whenever an event occurs, then have a delay loop that waits for the timer to complete? Commented Apr 3 at 21:17
  • 1
    You can use a FileSystemWatcher have your Delay reset and restart. You can avoid the FileSystemWatcher by checking the modification date of the directory periodically manually in a Timer-event. When you detect that the modification date changed and that this change is more than 1 second in the past, start your task. Commented Apr 3 at 21:20

5 Answers 5

1

Firstly, for this:

I also don't like the Task.Delay() loop - it feels like there should be a much more elegant way of doing this

A TaskCompletionSource is an adapter from events/reactive world to Task world. Basically, it exposes a Task that can be completed, faulted or canceled by calling methods on the completion source.

You can rewrite your code with Task.Delay to this:

    var completion = new TaskCompletionSource();

    // wait for 7 seconds after the last file creation
    var disposable = subject.Throttle(TimeSpan.FromSeconds(7)).Subscribe(_ => completion.TrySetResult());

    Console.WriteLine("Set up observer");

    await completion.Task; // Will wait until Throttle/Subscribe is called

    disposable.Dispose();

Secondly, you also mention

The problem is, this method will just wait forever if a user never makes any changes to the directory.

This is because you're waiting for the event to fire but the event won't fire until changes are made to the directory.

What I'd suggest instead is to check whether the directory has had any changes recently and if it has, wait for a bit and try again. For example:

while (!shutdownToken.IsCancellationRequested) // Your main driving loop, not sure what that is
{
    if (HasAnyRecentChanges(theDirectoryOfInterest))
    {
        await Task.Delay(debounceTime /* N in your question */, shutdownToken);
        continue;
    }

    ProcessFiles(theDirectoryOfInterest);
    await WaitUntilItsTimeToRunTheMainLoopAgainAsync(shutdownToken);
}

Now the question is what the HasAnyRecentChanges implementation will look like. I suggest to use simplest possible thing to start with - just check last modification times of files in the directory.

This may be slow for a large number of files so using FileSystemWatcher and keeping the time of last modification in a field could be an optimization, especially if you run this really frequently or for small N. However, loading the last modification times of all files would still be required to initialize the field at start-up so that implementation will be needed in either case.

Sorry for no RX, I don't think I could make it cleaner with it.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for the detailed answer! I'm actually glad you suggested a method that doesn't require Rx / FileSystemWatcher, because this is much simpler than what I had in my head. If I needed the granularity of FileSystemWatcher to distinguish between types of changes this could become problematic, but it turns out Directory.GetLastWriteTimeUtc() works great as my HasAnyRecentChanges implementation.
1

One possible approach is to use a timer that is reset each time the user makes a change. Something like:

public class MyClass
{
    private readonly TimeSpan delay;
    private System.Threading.Timer timer;
    private ManualResetEventSlim resetEvent = new(true);

    public MyClass(TimeSpan delay)
    {
        this.delay = delay;
        timer = new Timer(MyDelayedMethod, null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
    }

    public void OnFileSystemEvent()
    {
        resetEvent.Reset();
        timer.Change(delay, Timeout.InfiniteTimeSpan);
    }

    private void MyDelayedMethod(object _) => resetEvent.Set();
    public void WaitForQuietPeriod() => resetEvent.Wait();
}

This is fairly low level code, using blocking code rather than anything asynchronous, but it is still fairly straightforward. If you need async code you could probably replace the resetEvent with a TaskCompletion source. Making the FileSystemWatcher call OnFileSystemEvent is left as an exercise.

You probably want to make this a long lived object, so that it can monitor for changes even when you are not copying any files. Alternatively, check the modified date on any files on startup, and initialize the reset event to false if any file has been modified within the delay-time, as well as initializing the timer with the remaining due-time.

1 Comment

This is great, thank you! I wish I could mark two answers as accepted. I ended up going with @Zdeněk's answer due to its simplicity, but I think this would be better for anyone that needed the more detailed reporting that FileSystemWatcher provides. After some further Googling, I realized Directory.GetLastWriteTime is good enough for what I need, which is much simpler than dealing with event callbacks.
1

For what it's worth, here's the implementation I made after taking input from a few of the other answers / comments:

private async Task WaitOnDirectoryChanges(string directory, int timeoutMillis)
{
    int delayTime = timeoutMillis;

    while (true)
    {
        await Task.Delay(delayTime);
        _cancellationToken.ThrowIfCancellationRequested();

        var lastWrite = Directory.GetLastWriteTimeUtc(directory);
        var lastWriteMillisecondsAgo = (int)Math.Round((DateTime.UtcNow - lastWrite).TotalMilliseconds);

        if (lastWriteMillisecondsAgo <= delayTime)
        {
            delayTime = timeoutMillis - lastWriteMillisecondsAgo;
            _logger.Debug("Detected changes in {directory} within the timeout period of {timeoutMillis}ms. Waiting {delayTime}ms for further changes.", directory, timeoutMillis, delayTime);
        }
        else
        {
            break;
        }
    }
}

Comments

0

You may want to look up the term "debouncer". Here is a simple implementation I haven't tested:
https://gist.github.com/lcnvdl/43bfdcb781d799df6b7e8e66fe3792db

But looks fine to me.

You just call Debounce() every time something changes on your directory and pass your method that will copy the files.

1 Comment

The code sample uses Thread.Abort which won't work on .NET Core.
0

Try this, input - is the source of your filesystem watcher events:

public static IObservable<Unit> EmitOnTimeout<T>(this IObservable<T> input, TimeSpan expirationTimeout)
{
    var postponerSink = new Subject<T>();
    var emitter = postponerSink
    .Synchronize()
    .StartWith(default(T))
    .Select(_ => Observable.Return(Unit.Default).DelaySubscription(expirationTimeout))
    .Switch();

    return  Observable.Using(() => new CompositeDisposable(input.Subscribe(postponerSink), postponerSink), _ => emitter);
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.