Menu Docs
Página inicial do Docs
/
Atlas
/ /

$merge (Processamento de Stream)

A fase $merge especifica uma conexão no registro de conexão para gravar mensagens. A conexão deve ser uma conexão do Atlas.

Um estágio de pipeline do $merge tem a seguinte forma de protótipo:

{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
},
"on": "<identifier field>" | [ "<identifier field1>", ...],
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"whenMatched": "replace | keepExisting | merge | pipeline",
"whenNotMatched": "insert | discard",
"parallelism": <integer>
}
}

A versão Atlas Stream Processing do $merge usa a maioria dos mesmos campos que a versão Atlas Data Federation. O Atlas Stream Processing também utiliza os seguintes campos que são exclusivos de sua implementação de $merge ou foram modificados para se adequar a ela. Para aprender mais sobre os campos compartilhados com o Atlas Data Federation $merge, consulte sintaxe $merge.

Campo
necessidade
Descrição

into

Obrigatório

Simplificado para reproduzir o suporte do Atlas Stream Processing para $merge apenas em conexões do Atlas.

Para aprender mais, consulte esta descrição dos campos do Atlas Data Federation $merge.

parallelism

Condicional

Número de threads para distribuir operações de gravação. Deve ser um valor inteiro entre 1 e 16. Valores mais elevados de paralelismo aumentam a taxa de transferência. No entanto, valores mais altos também exigem que o processador de fluxo e o cluster para o qual ele grava usem mais recursos computacionais.

Se você utilizar um valor de expressão dinâmica para into.coll ou into.db, não poderá definir esse valor como maior que 1.

$merge deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $merge por pipeline.

O campo on tem requisitos especiais para $merge em relação a coleções fragmentadas. Para saber mais, consulte Sintaxe $merge.

Se você utilizar um valor de expressão dinâmica para into.coll ou into.db, não poderá definir um valor de parallelism superior a 1.

Você pode usar uma expressão dinâmica como o valor dos seguintes campos:

  • into.db

  • into.coll

Isso permite que seu processador de fluxo grave mensagens em diferentes collection de destino do Atlas, mensagem por mensagem.

Exemplo

Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

Para classificar cada um deles em um reconhecimento de data center e collection Atlas distintos, você pode escrever o seguinte estágio $merge :

$merge: {
into: {
connectionName: "db1",
db: "$customerStatus",
coll: "$transactionType"
}
}

Este estágio $merge :

  • Escreve a mensagem Very Important Industries para uma collection Atlas denominada VIP.subscription.

  • Escreve a mensagem N. E. Buddy para uma collection Atlas denominada employee.requisition.

  • Escreve a mensagem Khan Traktor para uma collection Atlas denominada contractor.billableHours.

Você só pode usar expressões dinâmicas que avaliam para strings. Para obter mais informações sobre expressões dinâmicas, consulte operadores de expressão.

Se você especificar um banco de dados ou coleção com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão para uma determinada mensagem, o Atlas Stream Processing enviará essa mensagem para a fila de mensagens não entregues, se configurada, e processará as mensagens subsequentes. Se não houver nenhuma fila de mensagens não entregues configurada, o Atlas Stream Processing ignorará a mensagem completamente e processará as mensagens subsequentes.

Para salvar dados de streaming de múltiplos tópicos do Apache Kafka em coleções no seu cluster Atlas, utilize o estágio $merge com o estágio $source. A etapa $source especifica os tópicos dos quais ler dados. O estágio $merge grava os dados na coleção de destino.

Use a seguinte sintaxe:

{
"$source": {
"connectionName": "<registered-kafka-connection>",
"topic": [ "<topic-name-1>", "<topic-name-2>", ... ]
}
},
{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
}
},
...
}

Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:

  1. O estágio $source estabelece uma conexão com o broker do Apache Kafka coletando esses relatórios em um tópico chamado my_weatherdata, expondo cada registro à medida que ele é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o como ingestionTime.

  2. O estágio $match exclui documentos que têm um dewPoint.value menor ou igual a 5.0 e passa os documentos com dewPoint.value maior que 5.0 para o próximo estágio.

  3. O estágio $merge grava a saída na coleção do Atlas chamada stream no banco de dados sample_weatherstream. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para visualizar os documentos na coleção sample_weatherstream.stream resultante, conecte-se ao cluster Atlas e execute o seguinte comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Observação

O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

Voltar

$emit

Nesta página

OSZAR »