Specialized components
Background processing
Analytics data collector
28 min
overview the analytics data collector is a specialized background task within the ir engine that periodically gathers, processes, and stores statistical information about platform usage it collects metrics such as active user counts, running instances, chat activity, and other operational data by regularly capturing these statistics, the collector creates a valuable historical record that can be used for monitoring platform health, analyzing usage patterns, and making data driven decisions this chapter explores the implementation, functionality, and integration of the analytics data collector within the ir engine's background processing system purpose and functionality the analytics data collector serves several important purposes usage monitoring tracks how many users are active on the platform resource utilization measures how many instances, channels, and other resources are in use trend analysis creates time series data for analyzing usage patterns over time performance metrics gathers data that can indicate system performance business intelligence provides insights for product and business decisions the collector typically gathers metrics such as number of active users count of running instances (virtual spaces) active chat channels and messages unique locations or scenes being used server resource utilization feature usage statistics implementation task initialization the analytics data collector is initialized when the task server application starts // from src/start ts import collectanalytics from ' /collect analytics'; export const start = async () promise\<application> => { const app = await createfeatherskoaapp(servermode task); // initialize the analytics collector collectanalytics(app); // other initialization code return app; }; the collectanalytics function receives the application instance, which provides access to services, database connections, and other resources needed for data collection collector setup the collector is implemented in src/collect analytics ts // simplified from src/collect analytics ts import config from '@ir engine/server core/src/appconfig'; import multilogger from '@ir engine/server core/src/serverlogger'; const logger = multilogger child({ component 'taskserver\ collect analytics' }); // get the collection interval from configuration const default interval seconds = 1800; // 30 minutes const configuredinterval = parseint(config\['task server'] processinterval); const runintervalmilliseconds = (configuredinterval || default interval seconds) 1000; // main export function that sets up the collector export default (app) => { // schedule periodic execution setinterval(async () => { try { await collectandstoredata(app); } catch (error) { logger error('error during analytics collection ', error); } }, runintervalmilliseconds); logger info(`analytics collection scheduled to run every ${runintervalmilliseconds / 1000} seconds`); }; this function imports the configuration to determine how often to run sets up a periodic timer using setinterval calls the collectandstoredata function at each interval includes error handling to prevent task failures logs the collection schedule for monitoring data collection and storage the collectandstoredata function performs the actual data gathering and storage // conceptual implementation of collectandstoredata async function collectandstoredata(app) { logger info('starting analytics collection cycle'); // data collection // example get count of active channels using a feathers service const activechannelslist = await app service('channel') find({ paginate false, query { active true } }); const activechannelscount = activechannelslist length; logger info(`found ${activechannelscount} active channels`); // example get count of active users in instances using knex const knex = app get('knexclient'); const activeinstanceusersresult = await knex count(' as count') from('instance attendance') where('ended', false); const activeinstanceuserscount = activeinstanceusersresult\[0] count; logger info(`found ${activeinstanceuserscount} users in instances`); // example get count of active instances const activeinstancesresult = await knex count(' as count') from('instance') where('ended', false); const activeinstancescount = activeinstancesresult\[0] count; logger info(`found ${activeinstancescount} active instances`); // data storage // store the collected metrics await app service('analytics') create({ type 'activechannels', count activechannelscount, timestamp new date() }); await app service('analytics') create({ type 'instanceusers', count activeinstanceuserscount, timestamp new date() }); await app service('analytics') create({ type 'activeinstances', count activeinstancescount, timestamp new date() }); logger info('analytics collection cycle complete'); } this function queries various data sources to gather metrics uses feathers services for some data (e g , channels) uses direct database queries via knex for other data processes the raw data (e g , counting items) stores each metric in the analytics service logs the process for monitoring and debugging data collection methods the analytics data collector uses several methods to gather data feathers services for data already managed by feathers services, the collector uses service methods // example of using a feathers service const activechannelslist = await app service('channel') find({ paginate false, query { active true } }); const activechannelscount = activechannelslist length; this approach leverages existing service logic and validation respects service level access controls follows the established service interaction pattern may include additional processing like filtering or counting direct database queries for more complex or performance sensitive queries, the collector may use knex // example of using knex for direct database access const knex = app get('knexclient'); const activeinstanceusersresult = await knex count(' as count') from('instance attendance') where('ended', false); const activeinstanceuserscount = activeinstanceusersresult\[0] count; this approach allows for optimized sql queries can perform aggregations at the database level may be more efficient for large datasets bypasses service level processing for direct access external api calls for metrics from external systems, the collector may use api clients // example of querying an external system const k8sapi = app get('k8sapi'); const nodemetrics = await k8sapi listnodemetrics(); const totalcpuusage = nodemetrics items reduce((sum, node) => { return sum + node usage cpu; }, 0); this approach gathers data from systems outside the main application may include infrastructure metrics from kubernetes could integrate with third party monitoring services requires handling external api authentication and rate limits data storage and retrieval the collected analytics data is typically stored in a structured format storage format each analytics record usually includes // typical analytics record structure interface analyticsrecord { type string; // the metric type (e g , 'activeusers') count number; // the numeric value timestamp date; // when the measurement was taken metadata? object; // optional additional context } this structure identifies the specific metric being recorded provides the actual measurement value timestamps the data for time series analysis may include additional context in metadata storage mechanism the collector typically uses a feathers service to store the data // storing data via the analytics service await app service('analytics') create({ type 'activechannels', count activechannelscount, timestamp new date() }); this service might store the data in a database table apply any necessary transformations handle authentication and authorization emit events that other parts of the system can listen for data retrieval and analysis the stored analytics data can later be retrieved for analysis // example of retrieving analytics data const lastweekanalytics = await app service('analytics') find({ query { type 'activeusers', timestamp { $gte new date(date now() 7 24 60 60 1000) // last 7 days }, $sort { timestamp 1 // ascending order by timestamp } } }); this data can then be displayed in administrative dashboards used to generate reports analyzed for trends and patterns monitored for anomalies or issues collection workflow the complete analytics collection workflow follows these steps sequencediagram participant timer as interval timer participant collector as analytics collector participant services as feathers services participant database as database (via knex) participant analytics as analytics service participant storage as analytics storage timer >>collector trigger collection cycle collector >>services query service data (app service() find()) services >>collector return service data collector >>database execute direct queries (knex count()) database >>collector return query results collector >>collector process and aggregate data collector >>analytics store metrics (app service('analytics') create()) analytics >>storage persist analytics records storage >>analytics confirm storage analytics >>collector confirm creation collector >>timer complete cycle note over timer wait for next interval this diagram illustrates the timer triggers the collection cycle the collector queries various data sources the collector processes the raw data the metrics are stored via the analytics service the cycle completes and waits for the next interval integration with other components the analytics data collector integrates with several other components of the background processing system task server application the collector is initialized and managed by the task server application // from src/start ts collectanalytics(app); this integration provides the collector with the application context ensures the collector starts when the task server starts gives the collector access to services and resources application configuration management the collector uses configuration to determine its behavior // from src/collect analytics ts const configuredinterval = parseint(config\['task server'] processinterval); this integration allows the collection interval to be configured enables environment specific settings supports runtime adjustments without code changes service interaction layer the collector interacts with various services to gather and store data // examples of service interaction const channels = await app service('channel') find({ / query / }); await app service('analytics') create({ / data / }); this integration leverages the established service architecture ensures consistent data access patterns respects service level security and validation periodic task scheduler for more complex scheduling needs, the collector might use a dedicated scheduler // conceptual integration with a scheduler import { scheduletask } from ' /scheduler'; export default (app) => { scheduletask('analytics collection', { interval config\['task server'] processinterval || 1800, handler async () => { await collectandstoredata(app); } }); }; this integration provides more sophisticated scheduling options centralizes task management may offer better monitoring and reliability benefits of analytics collection the analytics data collector provides several key benefits data driven decisions provides quantitative data for product and business decisions operational awareness creates visibility into platform usage and health trend identification enables recognition of usage patterns over time capacity planning helps predict resource needs based on growth trends issue detection can identify anomalies that might indicate problems performance optimization highlights areas that might need optimization business metrics provides data for kpis and business reporting these benefits make the analytics data collector a valuable component of the ir engine's background processing system next steps with an understanding of how the analytics data collector periodically gathers and stores metrics, the next chapter explores a more general approach to scheduling and managing periodic tasks next periodic task scheduler docid\ ko rntumdoaqqvjif odq