using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; namespace BCMToolbox { /* * Connects Hebbian3 networks with message pipes and handles execution in parallel. */ public abstract class ParallelHebbian3Executive : IDisposable { /// /// Classes which inherit this build different network configurations. /// protected abstract SynchronousHebbian3Network BuildNetwork(); int __processorCount = 0; List __networks = new List(); List __executionThreads = new List(); bool __started = false; ManualResetEvent __mainSync = new ManualResetEvent(false); int __waiting = 0; object __syncRoot = new object(); public ParallelHebbian3Executive(int newProcessorCount) { __processorCount = newProcessorCount; } public ParallelHebbian3Executive() : this(Environment.ProcessorCount) { } public void Initialize() { for (int i = 0; i < __processorCount; i++) __networks.Add(BuildNetwork()); // bind message-passing pipes for(int i = 0; i < __networks.Count; i++) for (int j = i + 1; j < __networks.Count; j++) { LocalPipe pipe = __networks[i].RegisterOtherNetwork(__networks[j]); __networks[j].LinkPipe(pipe); } // create execution threads for (int i = 0; i < __networks.Count; i++) { Thread th = new Thread( new ParameterizedThreadStart(this.runNetwork)); __executionThreads.Add(th); th.IsBackground = true; } } public object SyncRoot { get { return __syncRoot; } } void runNetwork(object net) { SynchronousHebbian3Network network = net as SynchronousHebbian3Network; while (true) { bool wait = false; network.Propagate(); lock (this) { if (__waiting == (__processorCount - 1)) // all have finished { __waiting = 0; __mainSync.Set(); __mainSync.Reset(); } else { wait = true; __waiting++; } } if (wait) __mainSync.WaitOne(); } } public int Count { get { return __processorCount; } } public void Start() { if (__started) throw new NotSupportedException("Execution has already started."); for (int i = 0; i < __executionThreads.Count; i++) __executionThreads[i].Start(__networks[i]); __started = true; } public void Stop() { if (!__started) return; for (int i = 0; i < __executionThreads.Count; i++) if ((__executionThreads[i].ThreadState == ThreadState.Running) || (__executionThreads[i].ThreadState == ThreadState.WaitSleepJoin)) __executionThreads[i].Abort(); } #region IDisposable Members public void Dispose() { if (__started) Stop(); } #endregion } }