1

I have the following situation:

I have a lot of threads in my project, and each thread process one "key" by time.

Two threads cannot process the same "key" at the same time, but my project process A LOOOOOT OF KEYS, so I can't store the "keys" on memory, I need to store on memory that a thread is processing a "key" and if another thread tries to process the same "key" this thread will be waiting in a lock clause.

Now I have the following structure:

public class Lock
{
    private static object _lockObj = new object();
    private static List<object> _lockListValues = new List<object>();

    public static void Execute(object value, Action action)
    {
        lock (_lockObj)
        {
            if (!_lockListValues.Contains(value))
                _lockListValues.Add(value);
        }

        lock (_lockListValues.First(x => x.Equals(value)))
        {
            action.Invoke();
        }
    }
}

It is working fine, the problem is that the keys aren't being removed from the memory. The biggest trouble is the multi thread feature because at any time a "key" can be processed.

How could I solve this without a global lock independent of the keys?

12
  • 3
    Could you not use a concurrent queue? Make a single thread that passes the keys onto the queue and make it so that the other threads pull one key from the queue and process it. By using the concurrent queue you can ensure that only one thread has the key at a time. Commented Nov 18, 2015 at 17:36
  • 2
    It is not safe to obtain a global lock to do Contains and Add but then iterate the list outside that lock. Commented Nov 18, 2015 at 17:37
  • @MiltoxBeyond If I do that, I will lose a lot of processing of my software, because all the process will taper on the same queue. I can create one queue for each key at execution time, but it will change all my structure (that is very big). Commented Nov 18, 2015 at 17:41
  • 1
    object key; lock (_lockObj) { if (!_lockListValues.Contains(value)) _lockListValues.Add(value); key=lockListValue.First(x=>x.Equals(value));}lock(key){...} feel free to optimize it in the case it is found. Commented Nov 18, 2015 at 18:22
  • 1
    That fixes that problem, but without seeing your key removal code, you will likely remove the key before you should if another thread gets blocked. Commented Nov 18, 2015 at 18:27

2 Answers 2

2

Sorry, but no, this is not the way it should be done.

First, you speak about keys, but you store keys as type object in List and then you are searching with LINQ to get that from list.

For that kind of stuff is here dictionary.

Second, object model, usually it is best to implement locking of some object around some class, make it nice and clean:

like:

using System.Collections.Concurrent;


public LockedObject<T>
{
    public readonly T data;
    public readonly int id;
    private readonly object obj = new object();
    LockedObject(int id, T data)
    {
        this.id = id;
        this.data = data;

    }

    //Usually, if you have Action related to some data,
    //it is better to receive
    //that data as parameter

    public void InvokeAction(Action<T> action)
    {
        lock(obj)
        {
            action(data);
        }
    }

}

//Now it is a concurrently safe object applying some action
//concurrently on given data, no matter how it is stored.
//But still, this is the best idea:


ConcurrentDictionary<int, LockedObject<T>> dict =
new ConcurrentDictionary<int, LockedObject<T>>();

//You can insert, read, remove all object's concurrently.

But, the best thing is yet to come! :) You can make it lock free and very easily!

EDIT1:

ConcurrentInvoke, dictionary like collection for concurrently safe invoking action over data. There can be only one action at the time on given key.

using System;
using System.Threading;
using System.Collections.Concurrent;


public class ConcurrentInvoke<TKey, TValue>
{
    //we hate lock() :)

    private class Data<TData>
    {
        public readonly TData data;
        private int flag;
        private Data(TData data)
        {
            this.data = data;
        }
        public static bool Contains<TTKey>(ConcurrentDictionary<TTKey, Data<TData>> dict, TTKey key)
        {
            return dict.ContainsKey(key);
        }
        public static bool TryAdd<TTKey>(ConcurrentDictionary<TTKey, Data<TData>> dict, TTKey key, TData data)
        {
            return dict.TryAdd(key, new Data<TData>(data));
        }
        // can not remove if,
        // not exist,
        // remove of the key already in progress,
        // invoke action of the key inprogress
        public static bool TryRemove<TTKey>(ConcurrentDictionary<TTKey, Data<TData>> dict, TTKey key, Action<TTKey, TData> action_removed = null)
        {
            Data<TData> data = null;
            if (!dict.TryGetValue(key, out data)) return false;

            var access = Interlocked.CompareExchange(ref data.flag, 1, 0) == 0;
            if (!access) return false;

            Data<TData> data2 = null;
            var removed = dict.TryRemove(key, out data2);

            Interlocked.Exchange(ref data.flag, 0);

            if (removed && action_removed != null) action_removed(key, data2.data);
            return removed;
        }
        // can not invoke if,
        // not exist,
        // remove of the key already in progress,
        // invoke action of the key inprogress
        public static bool TryInvokeAction<TTKey>(ConcurrentDictionary<TTKey, Data<TData>> dict, TTKey key, Action<TTKey, TData> invoke_action = null)
        {
            Data<TData> data = null;
            if (invoke_action == null || !dict.TryGetValue(key, out data)) return false;

            var access = Interlocked.CompareExchange(ref data.flag, 1, 0) == 0;
            if (!access) return false;

            invoke_action(key, data.data);

            Interlocked.Exchange(ref data.flag, 0);
            return true;
        }
    }

    private 
    readonly
    ConcurrentDictionary<TKey, Data<TValue>> dict =
    new ConcurrentDictionary<TKey, Data<TValue>>()
    ;

    public bool Contains(TKey key)
    {
        return Data<TValue>.Contains(dict, key);
    }
    public bool TryAdd(TKey key, TValue value)
    {
        return Data<TValue>.TryAdd(dict, key, value);
    }
    public bool TryRemove(TKey key, Action<TKey, TValue> removed = null)
    {
        return Data<TValue>.TryRemove(dict, key, removed);
    }
    public bool TryInvokeAction(TKey key, Action<TKey, TValue> invoke)
    {
        return Data<TValue>.TryInvokeAction(dict, key, invoke);
    }
}




ConcurrentInvoke<int, string> concurrent_invoke = new ConcurrentInvoke<int, string>();

concurrent_invoke.TryAdd(1, "string 1");
concurrent_invoke.TryAdd(2, "string 2");
concurrent_invoke.TryAdd(3, "string 3");

concurrent_invoke.TryRemove(1);

concurrent_invoke.TryInvokeAction(3, (key, value) =>
{
    Console.WriteLine("InvokingAction[key: {0}, vale: {1}", key, value);
});
Sign up to request clarification or add additional context in comments.

2 Comments

Your ConcurrentAccess<T> class is very clever. I like it! however dict.Add(t); will not compile because ConcurrentDictionary<T> does not have a non-try Add method. You likely want AddOrUpdate, but be aware this does give a small window where someone could call TryInvokeAction, another method calls TryAdd and gets a true result, then the invoked action finishes and overwrites the added value and also returns true. You could solve this problem with a 2nd dictionary or a thread safe HashSet<int> of keys that you check before calling TryAdd
@ScottChamberlain Yes, you are right, TryAdd and key uniqueness, I made an assumption about already unique keys! Yes, the second dictionary could work. That second class was a nice idea before sleep. I will improve it as edit:). I think I found today possible use in some other scenario, just not as static implementation... Thanks for thinking about it:).
1

Here is a robust and performant inplementation of the Lock class, based on a ConcurrentDictionary<K,V> collection:

public static class Lock
{
    private static readonly ConcurrentDictionary<object, Entry> _entries = new();

    private readonly record struct Entry(object Locker, int RefCount);

    public static void Execute(object key, Action action)
    {
        object locker = GetLocker(key);
        bool lockTaken = false;
        try
        {
            Monitor.Enter(locker, ref lockTaken);
            action();
        }
        finally
        {
            if (lockTaken) Monitor.Exit(locker);
            ReleaseLocker(key, locker);
        }
    }

    private static object GetLocker(object key)
    {
        Entry entry = _entries.AddOrUpdate(key,
            static _ => new Entry(new object(), 1),
            static (_, entry) => entry with { RefCount = entry.RefCount + 1 });
        return entry.Locker;
    }

    private static void ReleaseLocker(object key, object locker)
    {
        while (true)
        {
            bool exists = _entries.TryGetValue(key, out Entry entry);
            if (!exists)
                throw new InvalidOperationException("Key not found.");
            if (!ReferenceEquals(entry.Locker, locker))
                new InvalidOperationException("Unknown locker.");
            if (entry.RefCount > 1)
            {
                Entry newEntry = entry with { RefCount = entry.RefCount - 1 };
                if (_entries.TryUpdate(key, newEntry, entry))
                    break;
            }
            else
            {
                if (_entries.TryRemove(KeyValuePair.Create(key, entry)))
                    break;
            }
        }
    }
}

This implementation is based on an answer of a similar question: Asynchronous locking based on a key. You could look at that answer for a detailed explanation of how it works.

The two InvalidOperationException exceptions, "Key not found." and "Unknown locker.", should never be thrown. If any of these is ever thrown, it will be an indication of a logical error in the above implementation.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.