Thread safety of messages used by event sinks


I’m using the RO SDK in Delphi and in particular the event system so that my server can notify clients of various situations as they happen.
Internally, there are multiple instances of an object representing the client that gets created like this:

NewClient := TClient.Create(Session.SessionID, ServerDataModule.ROInMemoryEventRepository as IMyClientEventSink_Writer);

The TClient class stores the two parameters as private fields and uses them when the time comes. As the server is multithreaded, this may lead to situations where two TClient instances do this at the same time:

FEventSinkWriter.SomethingHappened(FSessionId, SomeDetails);

With about 5 clients, this works just fine. But now that we are pushing the system to the limits with about 100 clients, we start seeing strange things like access violations inside TROBinMessage.WriteStream being called by TROBinMessage.WriteToStream. Analyzing the code, it’s as if fStream is nil when the call is made.
This all seemed very strange to me, but in the context of an heavily multithreaded server, it occurred to me that the message used by ServerDataModule.ROInMemoryEventRepository is actually shared by all the TClient instances. This can even be seen inside TROEventRepository.QueryInterface:

eventwriter := eventwriterclass.Create(fMessage, Self);

So, if my analysis is correct, what can be done to avoid this multithreading issue? Is there a method inside TROEventRepository that would return a registered writer class instance while cloning the original message?
Or should I write my own encapsulating class that clones the message and takes care of the lifetime of the clone while also implementing IMyClientEventSink_Writer by proxying it to a registered writer class instance?

Thanks a lot for your inputs on this matter.

Ok, I knew I should have investigated further before posting, as there is this code inside TROEventWriter.Create:

inherited Create(aMessage, TRUE);

the TRUE here means that the message gets cloned, so my entire theory above is moot.

That being said, there is also the case in my own code which may call in parallel many methods that all access the same FEventSinkWriter writer.
If I’m correct, the SDK made sure that each event sink writer instance has no impact on any other by cloning the message, but the instance itself is not thread safe. This means I have to do the protection myself, most likely by using TMonitor.Enter/TMonitor.Exit

What do you think?


lets review a simple case.

procedure TNewEventSink_Writer.NewMethod(const __Sender: TGUID);
  __eventdata: Binary;
  lMessage: IROMessage;
  __eventdata := Binary.Create();
  lMessage := __Message;
    lMessage.InitializeEventMessage(nil, 'NewLibrary', EID_NewEventSink, 'NewMethod');


    Repository.StoreEventData(__Sender, __eventdata, ExcludeSender, ExcludeSessionList, SessionList.CommaText, EID_NewEventSink);
    lMessage := nil;

as you can see, __eventdata (i.e. stream) is created in run-time and destroyed at the end of method.
so it cannot be nil at the lMessage.WriteToStream(__eventdata); line.

I think, that I need some kind of testcase for reviewing this corner case.
you can attach it here or drop email to support@ for keeping privacy.

__eventdata is passed as aStream to TROBinMessage.WriteToStream
In turn, fStream is given as Source and aStream is given as Destination to TROBinMessage.WriteStream

Now, in the issue at hand, it is Source that is nil and that triggers the access violations we get.
I totally agree that __eventdata can never be nil but I’m convinced that fStream can get this value if we have multiple threads working on the same instance of the event writer.
And inside the bug reports we get, we have multiple threads whose stack trace is right into TROBinMessage methods, especially those that set fStream to nil.

A colleague suggested to change TROEventProxy._GetMessage from this:

Result := fMessage;

to this:

  Result := (fMessage as IROMessageCloneable).Clone;

We tested it quickly here, it appears to work fine, but this cannot be put in production because not all instances of IROMessage may implement IROMessageCloneable
And I also have my doubts about memory management because of this comment inside TROMessage.CreateRefCountedClone:

//Todo: implement reference counting;

What do you think?


IMyClientEventSink_Writer is created in TROEventRepository.QueryInterface as

eventwriter := eventwriterclass.Create(fMessage, Self);
constructor TROEventWriter.Create(const aMessage : IROMessage;
                       const aRepository : IROEventRepository);
  inherited Create(aMessage, TRUE);
constructor TROEventProxy.Create(const aMessage: IROMessage; CloneMessage: boolean);
  if CloneMessage then begin
    fMessage := (aMessage as IROMessageCloneable).Clone; //<<<<<<<<<<<<

i.e. fMessage is already cloned so you shouldn’t clone it in additional as you do in

  Result := (fMessage as IROMessageCloneable).Clone;

this is incorrect, because all messages are cloneable by default:

  TROMessage = class(TROComponent,
                     IUnknown, IROMessage, IROMessageCloneable, IROModuleInfo)

it may generate some slowness but no memory leaks should be created

Well, I’m sorry, but when all I receive is a IROMessage instance, I do not know which class implements the interface, and as such I have no knowledge of the other interfaces it may implement.
Granted, in the SDK, there are no message classes that do not inherit from TROMessage but there is no guarantee this will stay forever. It’s the basic principle behind using interfaces, one should only work with what the interface provides and not make any assumptions about the underlying class.
Now, if that property was to be changed to IROMessageCloneable, then that would be an all different story. Even better, I would also make IROMessageCloneable inherit from IROMessage so that pretty much any variable of type IROMessage could be replaced by IROMessageCloneable, avoiding the use of the as operator in many places. Obviously, the Clone method would also return a IROMessageCloneable instance.

Well, yes, if I am to use the event writer in only one thread at at time. To illustrate, here is the situation:

Client1 -> EventSinkWriter1 -> MessageClone1
Client2 -> EventSinkWriter2 -> MessageClone2
Client3 -> EventSinkWriter3 -> MessageClone3

ClientN -> EventSinkWriterN -> MessageCloneN

As already discussed, each event sink has its clone of the original message. So if I have multiple threads each working with a single client, there is no issue whatsoever.

But if I have many threads all calling the same event sink, then there are multiple crashes, as can be seen in this sample application: (121.8 KB)

After having compiled the client and server application, debug the server and start the client application on the side.
Now, click Connect then Classic event storm. This gives 51 events, arriving out of order as expected with events.
But, if you now click on Parallel event storm, then the server creates multiple threads that all call TClient.NotifySomethingHappened at the same time, which leads to parallel usage of the message clone held by the FEventSinkWriter instance inside TClient.
And there, you will see crashes all over the place because the calls are stepping on each others toes.

Now, if inside TMyClientEventSink_Writer.SomethingHappened we make sure that lMessage is a unique instance, then we have no parallel issue. And one way to make this happen is to change the _GetMessage method as suggested above. However, this prevents the client from processing the events as it, too, uses TROEventProxy inside TROEventReceiver. So the real solution is to change the code inside TMyClientEventSink_Writer.SomethingHappened so that lMessage receives __Message.Clone but this has one drawback: the changes has to be done again after each regeneration of the Invk file. Hopefully in my case, this does not happen that often.

So, to sum up, I believe the fix requires the generator for events invocations to be changed to make sure lMessage receives __Message.Clone. And to make this easier, here are the steps that I’m suggesting ( patch file):

  1. Modify IROMessageCloneable so that it inherits from IROMessage
  2. Modify IROMessageCloneable.Clone so that it returns IROMessageCloneable
  3. Adapt TROMessage.Clone to return IROMessageCloneable (simply change the type and remove the as cast)
  4. Have TROEventProxy use IROMessageCloneable instead of IROMessage everywhere
  5. Adapt TROEventWriter.Create to the above change
  6. Adapt TROEventInvoker.Create to the above change
  7. Inside TROEventReceiver.FireEvents, change the type of the messageclone variable to IROMessageCloneable

And after those changes are in, I believe a review is in order to change quite a few IROMessage references to IROMessageCloneable so as to avoid the costly as operator calls. In particular those that are placed in locations that can be called quite often.

your TClient class isn’t thread-safe class because you access to internal message simultaneously from different threads.

as a fix - you can create eventsink writer on fly as:

  IterationProc :=
    procedure (AIteration: Integer)
      FEventSinkWriter: IMyClientEventSink_Writer;
      FEventSinkWriter := (ServerDataModule.ROInMemoryEventRepository as IMyClientEventSink_Writer);
      FEventSinkWriter.ExcludeSender := False;
      FEventSinkWriter.SessionList.Text := ClientID.ToString;
      FEventSinkWriter.SomethingHappened(ClientID, 'Details: ' + AIteration.ToString);

in this case, you shouldn’t protect code because it is already thread safe

Well, that’s what I ended up doing, but your assertion that it is threadsafe is wrong. Indeed, TROEventRepository uses a TDictionary<> instance to store the registered event sink writer classes but the dictionary class itself is not threadsafe, even for reading its content when nothing changes.
So in the end, I had to write it like this:

function TClient.GetNewEventSinkWriter: IMyClientEventSink_Writer;
  // QueryInterface inside TROEventRepository uses TDictionary which is not threadsafe, even when only reading inside it.
    Result := FEventRepository as IMyClientEventSink_Writer;
    Result.ExcludeSender := False;
    Result.SessionList.Text := SessionId.ToString;

Yes, it may seem very strange that TDictionary<> is not threadsafe when reading, but I’ve already been bitten by it in the past and it also bit me in the real world scenario for which the example above has been created.

1 Like

I’ll review possibility to protect global dictionaries with TMultiReadExclusiveWriteSynchronizer

Thanks, logged as bugs://85325

bugs://85325 got closed with status fixed.