Отправка/создание json-сообщения через кафку

0

Вопрос

Это мой первый раз, когда я использую Кафку, и я планирую использовать кафку с .net

Я хотел знать, могу ли я отправлять JSON в качестве сообщения при создании события

Я следую учебнику: https://developer.confluent.io/get-started/dotnet/#build-producer

Кроме того, есть ли способ сопоставить это значение с моделью, чтобы структура value/json всегда была привязана к этой модели

Так, например: если я хочу, чтобы мое значение json было

{
  "customerName":"anything",
  "eventType":"one-of-three-enums",
  "columnsChanged": "string value or something"
}

Большинство примеров, которые я могу найти, похожи на это:

using Confluent.Kafka;
using System;
using Microsoft.Extensions.Configuration;

class Producer {
    static void Main(string[] args)
    {
        if (args.Length != 1) {
            Console.WriteLine("Please provide the configuration file path as a command line argument");
        }

        IConfiguration configuration = new ConfigurationBuilder()
            .AddIniFile(args[0])
            .Build();

        const string topic = "purchases";

        string[] users = { "eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther" };
        string[] items = { "book", "alarm clock", "t-shirts", "gift card", "batteries" };

        using (var producer = new ProducerBuilder<string, string>(
            configuration.AsEnumerable()).Build())
        {
            var numProduced = 0;
            const int numMessages = 10;
            for (int i = 0; i < numMessages; ++i)
            {
                Random rnd = new Random();
                var user = users[rnd.Next(users.Length)];
                var item = items[rnd.Next(items.Length)];

                producer.Produce(topic, new Message<string, string> { Key = user, Value = item },
                    (deliveryReport) =>
                    {
                        if (deliveryReport.Error.Code != ErrorCode.NoError) {
                            Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                        }
                        else {
                            Console.WriteLine($"Produced event to topic {topic}: key = {user,-10} value = {item}");
                            numProduced += 1;
                        }
                    });
            }

            producer.Flush(TimeSpan.FromSeconds(10));
            Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
        }
    }
}

Я бы хотел, чтобы элемент был классом в структуре json.

.net apache-kafka asp.net-core
2021-11-23 21:53:21
1

Лучший ответ

0

хотел знать, могу ли я отправлять JSON в качестве сообщения при создании события

Да. Кафка хранит байты и преобразует байты с помощью сериализаторов. При создании производителя у вас есть возможность позвонить SetValueSerializer.

Некоторые из встроенных сериализаторов можно найти по адресу- https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Кафка/Сериализаторы.cs

Вам нужно будет написать свой собственный, чтобы в целом обрабатывать любые типы моделей JSON.

При использовании сериализатора UTF8 для строк вам потребуется предварительно сериализовать объект из класса модели, а затем отправить его в качестве значения. В вашем примере вы бы заменили var item с некоторым сериализованным объектом.

Как превратить объект C# в строку JSON в .NET?

При использовании классов моделей ваши данные, как правило, будут строго типизированы до тех пор, пока вы не начнете вручную писать JSON или использовать типы словарей. Если вам нужна проверка внешних сообщений, реестр схем слияния является одним из примеров, который поддерживает JSONSchema и JsonSerializer От confluent-dotnet-kafka проект поддерживает это.

2021-11-23 22:27:28

Просто последующий вопрос. Знаете ли вы, могу ли я ограничить размер сообщения и есть ли способ для производителя проверить, каков размер события перед отправкой, и не разрешать отправлять сообщение, если размер превышает ограничение?
Learn AspNet

У Кафки есть ограничение по умолчанию в 1 МБ пакетов сообщений. Если вы получите размер сериализованного массива байтов, это должно быть близкое приближение к размеру отдельной записи (однако есть дополнительные накладные расходы, такие как заголовки записей и метки времени).
OneCricketeer

Спасибо. Не могли бы вы ответить, пожалуйста: stackoverflow.com/questions/70097676/...
Learn AspNet

На других языках

Эта страница на других языках

Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................