def process_writes(influxdb3_local, table_batches, args=None): for table_batch in table_batches: table_name = table_batch['table_name'] rows = table_batch['rows'] influxdb3_local.info(f'processed {len(rows)} row(s) from {table_name}') if table_name == 'weather': for row in rows: room = row.get('room', 'unknown') value = row.get('temperature') line = LineBuilder('weather_processed') line.tag('room', str(room)) line.string_field('source_table', table_name) line.float64_field('value', float(value)) influxdb3_local.write(line)