Please see my first Persistent Subscription below:
namespace PersistentSubscription
{
internal class Program
{
private static void Main()
{
var subscription = new PersistentSubscriptionClient();
subscription.Start();
}
}
public class PersistentSubscriptionClient
{
private IEventStoreConnection _conn;
private const string STREAM = "$ce-customer";
private const string GROUP = "a_test_group";
private const int DEFAULTPORT = 1113;
private static readonly UserCredentials User = new UserCredentials("admin", "changeit");
private EventStorePersistentSubscriptionBase _subscription;
public void Start()
{
var settings = ConnectionSettings.Create();
using (_conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT)))
{
_conn.ConnectAsync().Wait();
CreateSubscription();
ConnectToSubscription();
Console.WriteLine("waiting for events. press enter to exit");
Console.ReadLine();
}
}
private void ConnectToSubscription()
{
var bufferSize = 10;
var autoAck = true;
Action<EventStorePersistentSubscriptionBase, ResolvedEvent> eventAppeared = EventAppeared;
_subscription = _conn.ConnectToPersistentSubscription(STREAM, GROUP, eventAppeared, SubscriptionDropped, User, bufferSize, autoAck);
}
private void SubscriptionDropped(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase,
SubscriptionDropReason subscriptionDropReason, Exception ex)
{
ConnectToSubscription();
}
private static void EventAppeared(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase,
ResolvedEvent resolvedEvent)
{
MemoryStream stream = new MemoryStream(resolvedEvent.Event.Data);
IFormatter formatter = new BinaryFormatter();
stream.Seek(0, SeekOrigin.Begin);
try
{
CustomerCreated customerCreated = (CustomerCreated)formatter.Deserialize(stream);
Console.WriteLine(customerCreated);
}
catch (Exception e)
{
var test = "test";
}
}
private void CreateSubscription()
{
PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
.DoNotResolveLinkTos()
.StartFromCurrent();
try
{
_conn.CreatePersistentSubscriptionAsync(STREAM, GROUP, settings, User).Wait();
}
catch (AggregateException ex)
{
if (ex.InnerException.GetType() != typeof(InvalidOperationException)
&& ex.InnerException?.Message != $"Subscription group {GROUP} on stream {STREAM} already exists")
{
throw;
}
}
}
}
}
and my first client below:
using System;
using System.IO;
using System.Net;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Text;
using EventStore.ClientAPI;
namespace WritingEvents
{
class Program
{
static void Main(string[] args)
{
const int DEFAULTPORT = 1113;
var settings = ConnectionSettings.Create();
using (var conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT)))
{
conn.ConnectAsync().Wait();
CustomerCreated c1 = new CustomerCreated { Id = Guid.NewGuid(), Name = "Maria" };
EventData customerCreated1 = GetEventDataFor(c1);
conn.AppendToStreamAsync("customer-100", ExpectedVersion.Any, customerCreated1).Wait();
}
}
private static EventData GetEventDataFor(CustomerCreated customerCreated)
{
IFormatter formatter = new BinaryFormatter();
MemoryStream stream = new MemoryStream();
formatter.Serialize(stream, customerCreated);
byte[] customerCreatedEventByteArray = stream.ToArray();
return new EventData(
Guid.NewGuid(),
"eventType",
true,
customerCreatedEventByteArray,
null
);
}
}
[Serializable]
public class CustomerCreated
{
public Guid Id { get; set; }
public string Name { get; set; }
}
}
I run the server and then then the client. I see an error when deserializing the CustomerCreated event on the server side. The error is: "End of stream was encountered before parsing was completed".
If I change this line:
private const string STREAM = "$ce-customer";
to this:
private const string STREAM = "customer-100";
Then deserialization works correctly on the server side.
How do I handle all customer events - not just customer 100?
I have --run-projections=all
when starting Event Store. I have also enabled all projections: